"""
Functions that actually call emg3d within the CLI interface.
"""
# Copyright 2018-2020 The emg3d Developers.
#
# This file is part of emg3d.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import json
import time
import logging
import numpy as np
from emg3d import io, utils, surveys, simulations
from emg3d.cli import parser
[docs]def simulation(args_dict):
"""Run `emg3d` invoked by CLI.
Run and log `emg3d` given the settings stored in the config file, overruled
by settings passed in `args_dict` (which correspond to command-line
arguments).
Results are saved to files according to provided settings.
Parameters
----------
args_dict : dict
Arguments from terminal, see :func:`emg3d.cli.main`. Parameters passed
in `args_dict` overrule parameters in the `config`.
"""
# Start timer.
runtime = utils.Time()
# Parse configuration file.
cfg, term = parser.parse_config_file(args_dict)
function, verb = term['function'], term['verbosity']
dry_run = term.get('dry_run', False)
verb_io = max(0, verb-1) # io-verbosity only when debug (verbosity=2).
# Start this task: start timing.
logger = initiate_logger(cfg, runtime, verb)
# Log start info, python and emg3d version, and python path.
logger.info(f"\n:: emg3d {function} START :: {time.asctime()} :: "
f"v{utils.__version__}\n")
# Dump the configuration.
if not term['config_file']:
logger.warning("* WARNING :: CONFIGURATION FILE NOT FOUND.\n")
paramdump = json.dumps(cfg, sort_keys=True, indent=4)
logger.debug(f"** CONFIGURATION: {term['config_file']}\n{paramdump}\n")
# Load input.
sdata = io.load(cfg['files']['survey'], verb=verb_io)
mdata = io.load(cfg['files']['model'], verb=verb_io)
min_offset = cfg['simulation_options'].pop('min_offset', 0.0)
# Select data.
data = cfg['data']
if data:
# Get a dict.
tdata = sdata['survey']
tdict = tdata.to_dict()
# Select sources.
if 'sources' in data.keys():
tdata._data = tdata.data.sel(src=data['sources'])
tdict['sources'] = {
k: tdict['sources'][k] for k in data['sources']}
# Select receivers.
if 'receivers' in data.keys():
tdata._data = tdata.data.sel(rec=data['receivers'])
tdict['receivers'] = {
k: tdict['receivers'][k] for k in data['receivers']}
# Select frequencies.
if 'frequencies' in data.keys():
tdata._data = tdata.data.sel(freq=data['frequencies'])
tdict['frequencies'] = data['frequencies']
# Replace with selected data.
for key in tdict['data'].keys():
tdict['data'][key] = tdata.data[key].data
# Get new survey from reduced dict.
sdata['survey'] = surveys.Survey.from_dict(tdict)
# Create simulation.
sim = simulations.Simulation(
survey=sdata['survey'],
grid=mdata['mesh'],
model=mdata['model'],
verb=verb,
**cfg['simulation_options']
)
# Switch-off tqdm if verbosity is zero.
if verb < 1:
sim._tqdm_opts['disable'] = True
# Print simulation info.
logger.info(f"\n{sim}\n")
# Initiate output dict.
output = {}
# Compute forward model (all calls).
if dry_run:
output['data'] = np.zeros_like(sim.data.synthetic)
elif function == 'forward':
sim.compute(observed=True, min_offset=min_offset)
output['data'] = sim.data.observed
else:
sim.compute()
output['data'] = sim.data.synthetic
# Compute the misfit.
if function in ['misfit', 'gradient']:
if dry_run:
output['misfit'] = 0.0
else:
output['misfit'] = sim.misfit
output['n_observations'] = sim.survey.size
# Compute the gradient.
if function == 'gradient':
if dry_run:
output['gradient'] = np.zeros(mdata['mesh'].vnC)
else:
output['gradient'] = sim.gradient
# Add solver exit messages to log.
if not dry_run:
infostr = "\nSolver exit messages:\n"
for src, values in sim._dict_efield_info.items():
for freq, info in values.items():
infostr += f"- Src {src}; {freq} Hz : {info['exit_message']}"
if function == 'gradient':
binfo = sim._dict_efield_info[src][freq]['exit_message']
infostr += f"; back: : {binfo}\n"
else:
infostr += "\n"
logger.debug(infostr)
# Store output to disk.
if cfg['files']['store_simulation']:
output['simulation'] = sim
io.save(cfg['files']['output'], **output, verb=verb_io)
# Goodbye
logger.info(f"\n:: emg3d {function} END :: {time.asctime()} :: "
f"runtime = {runtime.runtime}\n")
[docs]def initiate_logger(cfg, runtime, verb):
"""Initiate logger for CLI of emg3d."""
# Get logger of emg3d.cli.run and add handles.
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Remove the corresponding handlers if they exist already
# (e.g., consecutive runs in IPython).
for h in logger.handlers[:]:
if h.name in ['emg3d_fh', 'emg3d_ch']:
logger.removeHandler(h)
h.close()
# Create file handler; logs everything.
fh = logging.FileHandler(f"{cfg['files']['log']}", mode='w')
fh.setLevel(logging.DEBUG)
fh_format = logging.Formatter('{message}', style='{')
fh.setFormatter(fh_format)
fh.set_name('emg3d_fh') # Add name to easy remove them.
logger.addHandler(fh)
# Create console handler.
ch = logging.StreamHandler()
ch.setLevel([40, 30, 20, 10][verb+1])
ch_format = logging.Formatter('{message}', style='{')
ch.setFormatter(ch_format)
ch.set_name('emg3d_ch') # Add name to easy remove them.
logger.addHandler(ch)
return logger