"""
Functions that actually call emg3d within the CLI interface.
"""
# Copyright 2018-2021 The emg3d Developers.
#
# This file is part of emg3d.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import os
import sys
import json
import time
import logging
import numpy as np
from emg3d import io, utils, simulations
from emg3d.cli import parser
[docs]def simulation(args_dict):
"""Run `emg3d` invoked by CLI.
Run and log `emg3d` given the settings stored in the config file, overruled
by settings passed in `args_dict` (which correspond to command-line
arguments).
Results are saved to files according to provided settings.
Parameters
----------
args_dict : dict
Arguments from terminal, see :func:`emg3d.cli.main`. Parameters passed
in `args_dict` overrule parameters in the `config`.
"""
# Start timer.
runtime = utils.Time()
# Parse configuration file.
cfg, term = parser.parse_config_file(args_dict)
check_files(cfg, term) # Check all files and directories exist.
function, verb = term['function'], term['verbosity']
dry_run = term.get('dry_run', False)
# Start this task: start timing.
logger = initiate_logger(cfg, runtime, verb)
# Log start info, python and emg3d version, and python path.
logger.info(f":: emg3d CLI {function} START :: {time.asctime()} :: "
f"v{utils.__version__}")
logger.debug(f"{utils.Report()}")
# Dump the configuration.
paramdump = json.dumps(cfg, sort_keys=True, indent=4)
logger.debug("\n :: CONFIGURATION ::\n")
logger.debug(f"{term['config_file']}\n{paramdump}")
# Load input.
logger.info("\n :: LOAD SURVEY AND MODEL ::\n")
sdata, sinfo = io.load(cfg['files']['survey'], verb=-1)
survey = sdata['survey']
logger.info(sinfo.split('\n')[0])
logger.debug(sinfo.split('\n')[1])
model, minfo = io.load(cfg['files']['model'], verb=-1)
logger.info(minfo.split('\n')[0])
logger.debug(minfo.split('\n')[1])
min_offset = cfg['simulation_options'].pop('min_offset', 0.0)
# Select data.
data = cfg['data']
if data:
survey = survey.select(sources=data.get('sources', None),
receivers=data.get('receivers', None),
frequencies=data.get('frequencies', None))
# Create simulation.
sim = simulations.Simulation(
survey=survey,
grid=model['mesh'],
model=model['model'],
verb=-1, # Only errors.
**cfg['simulation_options']
)
# Switch-off tqdm if verbosity is zero.
if verb < 1:
sim._tqdm_opts['disable'] = True
# Print simulation info.
logger.info("\n :: SIMULATION ::")
logger.info(f"\n{sim}\n")
# Print meshes.
logger.debug(" :: MESHES ::\n")
logger.debug(sim.print_grid_info(return_info=True))
# Initiate output dict, add configuration.
# Ideally, we would add the entire configuration; however, this causes
# currently problems for saving h5. We therefore save only the data
# selection info, such that we know what data was selected.
# output = {'configuration': cfg}
output = {'configuration': {'data': cfg.get('data', {})}}
# Compute forward model (all calls).
logger.info(" :: FORWARD COMPUTATION ::\n")
if dry_run:
output['data'] = np.zeros(sim.survey.shape, dtype=complex)
else:
if function == 'forward':
sim.compute(observed=True, min_offset=min_offset)
output['data'] = sim.data.observed
else:
sim.compute()
output['data'] = sim.data.synthetic
# Print Solver Logs.
if verb in [0, 1]:
sim.print_solver_info('efield', 0)
logger.debug(sim.print_solver_info('efield', 1, True))
# Compute the misfit.
if function in ['misfit', 'gradient']:
if dry_run:
output['misfit'] = 0.0
else:
output['misfit'] = sim.misfit
output['n_observations'] = sim.survey.size
# Compute the gradient.
if function == 'gradient':
logger.info("\n :: BACKWARD COMPUTATION ::\n")
if dry_run:
output['gradient'] = np.zeros(model['mesh'].vnC)
else:
output['gradient'] = sim.gradient
# Print Solver Logs.
if verb in [0, 1]:
sim.print_solver_info('bfield', 0)
logger.debug(sim.print_solver_info('bfield', 1, True))
# Store output to disk.
logger.info(" :: SAVE RESULTS ::\n")
if cfg['files']['store_simulation']:
output['simulation'] = sim
oinfo = io.save(cfg['files']['output'], **output, verb=-1)
logger.info(oinfo.split('\n')[0])
logger.debug(oinfo.split('\n')[1])
# Goodbye
logger.info(f"\n:: emg3d CLI {function} END :: {time.asctime()} :: "
f"runtime = {runtime.runtime}")
[docs]def check_files(cfg, term):
"""Ensure all paths and files exist."""
error = ""
# First check if config file exists.
fname = term['config_file']
if not os.path.isfile(fname) and fname != '.': # '.' => no config file.
error += f"* ERROR :: Config file not found: {fname}\n"
# Check Survey and Model.
for name in ['Survey', 'Model']:
fname = cfg['files'][name.lower()]
if not os.path.isfile(fname):
error += f"* ERROR :: {name} file not found: {fname}\n"
# Finally check output directory.
dname = os.path.split(cfg['files']['log'])[0]
if not os.path.isdir(dname):
error += f"* ERROR :: Output directory does not exist: {dname}\n"
# If any was not found, exit with error.
if len(error) > 10:
sys.exit(error[:-1])
[docs]def initiate_logger(cfg, runtime, verb):
"""Initiate logger for CLI of emg3d."""
# Get logger of emg3d.cli.run and add handles.
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Remove the corresponding handlers if they exist already
# (e.g., consecutive runs in IPython).
for h in logger.handlers[:]:
if h.name in ['emg3d_fh', 'emg3d_ch']:
logger.removeHandler(h)
h.close()
# Create file handler; logs everything.
fh = logging.FileHandler(f"{cfg['files']['log']}", mode='w')
fh.setLevel(logging.DEBUG)
fh_format = logging.Formatter('{message}', style='{')
fh.setFormatter(fh_format)
fh.set_name('emg3d_fh') # Add name to easy remove them.
logger.addHandler(fh)
# Create console handler.
ch = logging.StreamHandler()
ch.setLevel([40, 30, 20, 10][verb+1])
ch_format = logging.Formatter('{message}', style='{')
ch.setFormatter(ch_format)
ch.set_name('emg3d_ch') # Add name to easy remove them.
logger.addHandler(ch)
# Add handlers to Python Warnings.
logging.captureWarnings(True)
logger_warnings = logging.getLogger("py.warnings")
logger_warnings.setLevel(logging.DEBUG)
logger_warnings.addHandler(ch)
logger_warnings.addHandler(fh)
return logger