Source code for emg3d.utils

"""
Utility functions for the multigrid solver.
"""
# Copyright 2018-2020 The emg3d Developers.
#
# This file is part of emg3d.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License.  You may obtain a copy
# of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
# License for the specific language governing permissions and limitations under
# the License.

import copy
import importlib
from timeit import default_timer
from datetime import datetime, timedelta
from concurrent.futures import ProcessPoolExecutor

import numpy as np
from scipy.interpolate import PchipInterpolator as Pchip
from scipy.interpolate import InterpolatedUnivariateSpline as Spline

try:
    import scooby
    from scooby import Report as ScoobyReport
except ImportError:
    scooby = None

    class ScoobyReport:
        pass

try:
    import empymod
except ImportError:
    empymod = None

# Version: We take care of it here instead of in __init__, so we can use it
# within the package itself (logs).
try:
    # - Released versions just tags:       0.8.0
    # - GitHub commits add .dev#+hash:     0.8.1.dev4+g2785721
    # - Uncommitted changes add timestamp: 0.8.1.dev4+g2785721.d20191022
    from emg3d.version import version as __version__
except ImportError:
    # If it was not installed, then we don't know the version. We could throw a
    # warning here, but this case *should* be rare. emg3d should be installed
    # properly!
    __version__ = 'unknown-'+datetime.today().strftime('%Y%m%d')

__all__ = ['Fourier', 'Time', 'Report', 'EMArray']


# SOFT DEPENDENCIES CHECK
def _requires(*args, **kwargs):
    """Decorator to wrap functions with extra dependencies.

    This function is taken from `pysal` (in `lib/common.py`); see
    https://github.com/pysal/pysal (released under the 'BSD 3-Clause "New" or
    "Revised" License').


    Parameters
    ---------
    args : list
        Strings containing the modules to import.

    verbose : bool
        If True (default) print a warning message on import failure.


    Returns
    -------
    out : func
        Original function if all modules are importable, otherwise returns a
        function that passes.
    """
    def simport(modname):
        """Safely import a module without raising an error."""
        try:
            return True, importlib.import_module(modname)
        except ImportError:
            return False, None

    v = kwargs.pop('verbose', True)
    wanted = copy.deepcopy(args)

    def inner(function):
        available = [simport(arg)[0] for arg in args]
        if all(available):
            return function
        else:
            def passer(*args, **kwargs):
                if v:
                    missing = [arg for i, arg in enumerate(wanted)
                               if not available[i]]
                    print("=> This feature of `emg3d` requires the following, "
                          f"missing soft dependencies: {missing}.")
                else:
                    pass
            return passer

    return inner


# EMArray
[docs]class EMArray(np.ndarray): r"""Create an EM-ndarray: add *amplitude* <amp> and *phase* <pha> methods. Parameters ---------- data : array Data to which to add `.amp` and `.pha` attributes. Examples -------- >>> import numpy as np >>> from empymod.utils import EMArray >>> emvalues = EMArray(np.array([1+1j, 1-4j, -1+2j])) >>> print(f"Amplitude : {emvalues.amp()}") Amplitude : [1.41421356 4.12310563 2.23606798] >>> print(f"Phase (rad) : {emvalues.pha()}") Phase (rad) : [ 0.78539816 -1.32581766 -4.24874137] >>> print(f"Phase (deg) : {emvalues.pha(deg=True)}") Phase (deg) : [ 45. -75.96375653 -243.43494882] >>> print(f"Phase (deg; lead) : {emvalues.pha(deg=True, lag=False)}") Phase (deg; lead) : [-45. 75.96375653 243.43494882] """ def __new__(cls, data): r"""Create a new EMArray.""" return np.asarray(data).view(cls)
[docs] def amp(self): """Amplitude of the electromagnetic field.""" return np.abs(self.view())
[docs] def pha(self, deg=False, unwrap=True, lag=True): """Phase of the electromagnetic field. Parameters ---------- deg : bool If True the returned phase is in degrees, else in radians. Default is False (radians). unwrap : bool If True the returned phase is unwrapped. Default is True (unwrapped). lag : bool If True the returned phase is lag, else lead defined. Default is True (lag defined). """ # Get phase, lead or lag defined. if lag: pha = np.angle(self.view()) else: pha = np.angle(np.conj(self.view())) # Unwrap if `unwrap`. # np.unwrap removes the EMArray class; # for consistency, we wrap it in EMArray again. if unwrap and self.size > 1: pha = EMArray(np.unwrap(pha)) # Convert to degrees if `deg`. if deg: pha *= 180/np.pi return pha
# TIME DOMAIN
[docs]@_requires('empymod') class Fourier: r"""Time-domain CSEM computation. Class to carry out time-domain modelling with the frequency-domain code `emg3d`. Instances of the class take care of computing the required frequencies, the interpolation from coarse, limited-band frequencies to the required frequencies, and carrying out the actual transform. Everything related to the Fourier transform is done by utilising the capabilities of the 1D modeller :mod:`empymod`. The input parameters `time`, `signal`, `ft`, and `ftarg` are passed to the function :func:`empymod.utils.check_time` to obtain the required frequencies. The actual transform is subsequently carried out by calling :func:`empymod.model.tem`. See these functions for more details about the exact implementations of the Fourier transforms and its parameters. Note that also the `verb`-argument follows the definition in `empymod`. The mapping from computed frequencies to the frequencies required for the Fourier transform is done in three steps: - Data for :math:`f>f_\mathrm{max}` is set to 0+0j. - Data for :math:`f<f_\mathrm{min}` is interpolated by adding an additional data point at a frequency of 1e-100 Hz. The data for this point is ``data.real[0]+0j``, hence the real part of the lowest computed frequency and zero imaginary part. Interpolation is carried out using PCHIP :func:`scipy.interpolate.pchip_interpolate`. - Data for :math:`f_\mathrm{min}\le f \le f_\mathrm{max}` is computed with cubic spline interpolation (on a log-scale) :class:`scipy.interpolate.InterpolatedUnivariateSpline`. Note that `fmin` and `fmax` should be chosen wide enough such that the mapping for :math:`f>f_\mathrm{max}` :math:`f<f_\mathrm{min}` does not matter that much. Parameters ---------- time : ndarray Desired times (s). fmin, fmax : float Minimum and maximum frequencies (Hz) to compute: - Data for freq > fmax is set to 0+0j. - Data for freq < fmin is interpolated, using an extra data-point at f = 1e-100 Hz, with value data.real[0]+0j. (Hence zero imaginary part, and the lowest computed real value.) signal : {0, 1, -1}, optional Source signal, default is 0: - None: Frequency-domain response - -1 : Switch-off time-domain response - 0 : Impulse time-domain response - +1 : Switch-on time-domain response ft : {'sin', 'cos', 'fftlog'}, optional Flag to choose either the Digital Linear Filter method (Sine- or Cosine-Filter) or the FFTLog for the Fourier transform. Defaults to 'sin'. ftarg : dict, optional Depends on the value for `ft`: - If `ft='dlf'`: - `dlf`: string of filter name in :mod:`empymod.filters` or the filter method itself. (Default: :func:`empymod.filters.key_201_CosSin_2012`) - `pts_per_dec`: points per decade; (default: -1) - If 0: Standard DLF. - If < 0: Lagged Convolution DLF. - If > 0: Splined DLF - If `ft='fftlog'`: - `pts_per_dec`: sampels per decade (default: 10) - `add_dec`: additional decades [left, right] (default: [-2, 1]) - `q`: exponent of power law bias (default: 0); -1 <= q <= 1 freq_inp : array Frequencies to use for computation. Mutually exclusive with `every_x_freq`. every_x_freq : int Every `every_x_freq`-th frequency of the required frequency-range is used for computation. Mutually exclusive with `freq_calc`. """ def __init__(self, time, fmin, fmax, signal=0, ft='dlf', ftarg=None, **kwargs): """Initialize a Fourier instance.""" # Store the input parameters. self._time = time self._fmin = fmin self._fmax = fmax self._signal = signal self._ft = ft if ftarg is None: self._ftarg = {} else: self._ftarg = ftarg # Get kwargs. self._freq_inp = kwargs.pop('freq_inp', None) self._every_x_freq = kwargs.pop('every_x_freq', None) self.verb = kwargs.pop('verb', 3) # Ensure no kwargs left. if kwargs: raise TypeError(f"Unexpected **kwargs: {list(kwargs.keys())}") # Ensure freq_inp and every_x_freq are not both set. self._check_coarse_inputs(keep_freq_inp=True) # Get required frequencies. self._check_time() def __repr__(self): """Simple representation.""" return (f"Fourier: {self._ft}; {self.time.min()}-{self.time.max()} s; " f"{self.fmin}-{self.fmax} Hz") # PURE PROPERTIES @property def freq_req(self): """Frequencies required to carry out the Fourier transform.""" return self._freq_req @property def freq_coarse(self): """Coarse frequency range, can be different from `freq_req`.""" if self.every_x_freq is None and self.freq_inp is None: # If none of {every_x_freq, freq_inp} given, then # freq_coarse = freq_req. return self.freq_req elif self.every_x_freq is None: # If freq_inp given, then freq_coarse = freq_inp. return self.freq_inp else: # If every_x_freq given, get subset of freq_req. return self.freq_req[::self.every_x_freq] @property def freq_calc_i(self): """Indices of `freq_coarse` which have to be computed.""" ind = (self.freq_coarse >= self.fmin) & (self.freq_coarse <= self.fmax) return ind @property def freq_calc(self): """Frequencies at which the model has to be computed.""" return self.freq_coarse[self.freq_calc_i] @property def freq_extrapolate_i(self): """Indices of the frequencies to extrapolate.""" return self.freq_req < self.fmin @property def freq_extrapolate(self): """These are the frequencies to extrapolate. In fact, it is dow via interpolation, using an extra data-point at f = 1e-100 Hz, with value data.real[0]+0j. (Hence zero imaginary part, and the lowest computed real value.) """ return self.freq_req[self.freq_extrapolate_i] @property def freq_interpolate_i(self): """Indices of the frequencies to interpolate. If freq_req is equal freq_coarse, then this is eual to freq_calc_i. """ return (self.freq_req >= self.fmin) & (self.freq_req <= self.fmax) @property def freq_interpolate(self): """These are the frequencies to interpolate. If freq_req is equal freq_coarse, then this is eual to freq_calc. """ return self.freq_req[self.freq_interpolate_i] @property def ft(self): """Type of Fourier transform. Set via ``fourier_arguments(ft, ftarg)``. """ return self._ft @property def ftarg(self): """Fourier transform arguments. Set via ``fourier_arguments(ft, ftarg)``. """ return self._ftarg # PROPERTIES WITH SETTERS @property def time(self): """Desired times (s).""" return self._time @time.setter def time(self, time): """Update desired times (s).""" self._time = time self._check_time() @property def fmax(self): """Maximum frequency (Hz) to compute.""" return self._fmax @fmax.setter def fmax(self, fmax): """Update maximum frequency (Hz) to compute.""" self._fmax = fmax self._print_freq_calc() @property def fmin(self): """Minimum frequency (Hz) to compute.""" return self._fmin @fmin.setter def fmin(self, fmin): """Update minimum frequency (Hz) to compute.""" self._fmin = fmin self._print_freq_calc() @property def signal(self): """Signal in time domain {0, 1, -1}.""" return self._signal @signal.setter def signal(self, signal): """Update signal in time domain {0, 1, -1}.""" self._signal = signal @property def freq_inp(self): """If set, freq_coarse is set to freq_inp.""" return self._freq_inp @freq_inp.setter def freq_inp(self, freq_inp): """Update freq_inp. Erases every_x_freq if set.""" self._freq_inp = freq_inp self._check_coarse_inputs(keep_freq_inp=True) @property def every_x_freq(self): """If set, freq_coarse is every_x_freq-frequency of freq_req.""" return self._every_x_freq @every_x_freq.setter def every_x_freq(self, every_x_freq): """Update every_x_freq. Erases freq_inp if set.""" self._every_x_freq = every_x_freq self._check_coarse_inputs(keep_freq_inp=False) # OTHER STUFF
[docs] def fourier_arguments(self, ft, ftarg): """Set Fourier type and its arguments.""" self._ft = ft self._ftarg = ftarg self._check_time()
[docs] def interpolate(self, fdata): """Interpolate from computed data to required data. Parameters ---------- fdata : ndarray Frequency-domain data corresponding to `freq_calc`. Returns ------- full_data : ndarray Frequency-domain data corresponding to `freq_req`. """ # Pre-allocate result. out = np.zeros(self.freq_req.size, dtype=np.complex128) # 1. Interpolate between fmin and fmax. # If freq_coarse is not exactly freq_req, we use cubic spline to # interpolate from fmin to fmax. if self.freq_coarse.size != self.freq_req.size: int_real = Spline(np.log(self.freq_calc), fdata.real)(np.log(self.freq_interpolate)) int_imag = Spline(np.log(self.freq_calc), fdata.imag)(np.log(self.freq_interpolate)) out[self.freq_interpolate_i] = int_real + 1j*int_imag else: # If they are the same, just fill in the data. out[self.freq_interpolate_i] = fdata # 2. Extrapolate from freq_req.min to fmin using PCHIP. # 2.a Extend freq_req/data by adding a point at 1e-100 Hz with # - same real part as lowest computed frequency and # - zero imaginary part. freq_ext = np.r_[1e-100, self.freq_calc] data_ext = np.r_[fdata[0].real-1e-100j, fdata] # 2.b Actual 'extrapolation' (now an interpolation). ext_real = Pchip(freq_ext, data_ext.real)(self.freq_extrapolate) ext_imag = Pchip(freq_ext, data_ext.imag)(self.freq_extrapolate) out[self.freq_extrapolate_i] = ext_real + 1j*ext_imag return out
[docs] def freq2time(self, fdata, off): """Compute corresponding time-domain signal. Carry out the actual Fourier transform. Parameters ---------- fdata : ndarray Frequency-domain data corresponding to `freq_calc`. off : float Corresponding offset (m). Returns ------- tdata : ndarray Time-domain data corresponding to Fourier.time. """ # Interpolate the computed data at the required frequencies. inp_data = self.interpolate(fdata) # Carry out the Fourier transform. tdata, _ = empymod.model.tem( inp_data[:, None], np.array(off), freq=self.freq_req, time=self.time, signal=self.signal, ft=self.ft, ftarg=self.ftarg) return np.squeeze(tdata)
# PRIVATE ROUTINES def _check_time(self): """Get required frequencies for given times and ft/ftarg.""" # Get freq via empymod. _, freq, ft, ftarg = empymod.utils.check_time( self.time, self.signal, self.ft, self.ftarg, self.verb) # Store required frequencies and check ft, ftarg. self._freq_req = freq self._ft = ft self._ftarg = ftarg # Print frequency information (if verbose). if self.verb > 2: self._print_freq_ftarg() self._print_freq_calc() def _check_coarse_inputs(self, keep_freq_inp=True): """Parameters `freq_inp` and `every_x_freq` are mutually exclusive.""" # If they are both set, reset one depending on `keep_freq_inp`. if self._freq_inp is not None and self._every_x_freq is not None: print("\n* WARNING :: `freq_inp` and `every_x_freq` are mutually " "exclusive.\n Re-setting ", end="") if keep_freq_inp: # Keep freq_inp. print("`every_x_freq=None`.\n") self._every_x_freq = None else: # Keep every_x_freq. print("`freq_inp=None`.\n") self._freq_inp = None # PRINTING ROUTINES def _print_freq_ftarg(self): """Print required frequency range.""" if self.verb > 2: empymod.utils._prnt_min_max_val( self.freq_req, " Req. freq [Hz] : ", self.verb) def _print_freq_calc(self): """Print actually computed frequency range.""" if self.verb > 2: empymod.utils._prnt_min_max_val( self.freq_calc, " Calc. freq [Hz] : ", self.verb)
# TIMING AND REPORTING
[docs]class Time: """Class for timing (now; runtime).""" def __init__(self): """Initialize time zero (t0) with current time stamp.""" self._t0 = default_timer() def __repr__(self): """Simple representation.""" return f"Runtime : {self.runtime}" @property def t0(self): """Return time zero of this class instance.""" return self._t0 @property def now(self): """Return string of current time.""" return datetime.now().strftime("%H:%M:%S") @property def runtime(self): """Return string of runtime since time zero.""" return str(timedelta(seconds=np.round(self.elapsed))) @property def elapsed(self): """Return runtime in seconds since time zero.""" return default_timer() - self._t0
[docs]@_requires('scooby') class Report(ScoobyReport): r"""Print date, time, and version information. Use `scooby` to print date, time, and package version information in any environment (Jupyter notebook, IPython console, Python console, QT console), either as html-table (notebook) or as plain text (anywhere). Always shown are the OS, number of CPU(s), `numpy`, `scipy`, `emg3d`, `numba`, `sys.version`, and time/date. Additionally shown are, if they can be imported, `IPython` and `matplotlib`. It also shows MKL information, if available. All modules provided in `add_pckg` are also shown. .. note:: The package `scooby` has to be installed in order to use `Report`: ``pip install scooby``. Parameters ---------- add_pckg : packages, optional Package or list of packages to add to output information (must be imported beforehand). ncol : int, optional Number of package-columns in html table (no effect in text-version); Defaults to 3. text_width : int, optional The text width for non-HTML display modes sort : bool, optional Sort the packages when the report is shown Examples -------- >>> import pytest >>> import dateutil >>> from emg3d import Report >>> Report() # Default values >>> Report(pytest) # Provide additional package >>> Report([pytest, dateutil], ncol=5) # Set nr of columns """ def __init__(self, add_pckg=None, ncol=3, text_width=80, sort=False): """Initiate a scooby.Report instance.""" # Mandatory packages. core = ['numpy', 'scipy', 'numba', 'emg3d'] # Optional packages. optional = ['empymod', 'xarray', 'discretize', 'h5py', 'matplotlib', 'tqdm', 'IPython'] super().__init__(additional=add_pckg, core=core, optional=optional, ncol=ncol, text_width=text_width, sort=sort)
# MISC def _process_map(fn, *iterables, max_workers, **kwargs): """Imitate tqdm.contrib.concurrent.process_map without tqdm. emg3d.simulation uses `process_map` from `tqdm` to run jobs asynchronously. However, `tqdm` is a soft dependency. In case it is not installed we simply use `concurrent.futures.ProcessPoolExecutor`, from the standard library, and imitate the behaviour of process_map (basically a `ProcessPoolExecutor.map`, returned as a list, and wrapped in a context manager). """ with ProcessPoolExecutor(max_workers=max_workers) as ex: return list(ex.map(fn, *iterables))