Source code for emg3d.surveys

"""
A survey stores a set of sources and their frequencies, receivers, and the
measured data.
"""
# Copyright 2018 The emsig community.
#
# This file is part of emg3d.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License.  You may obtain a copy
# of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
# License for the specific language governing permissions and limitations under
# the License.

from copy import deepcopy

import numpy as np

try:
    import xarray
except ImportError:
    xarray = None

from emg3d import electrodes, utils, io

__all__ = ['Survey', 'random_noise', 'txrx_coordinates_to_dict',
           'txrx_lists_to_dict', 'frequencies_to_dict']


def __dir__():
    return __all__


[docs]@utils._known_class class Survey: """Create a survey containing sources, receivers, and data. A survey contains the acquisition information such as source types, positions, and frequencies and receiver types and positions. A survey contains also any acquired or synthetic data and their expected relative error and noise floor. The data is stored in an 3D ndarray of dimension ``nsrc x nrec x nfreq``. Underlying the survey-class is an :class:`xarray.Dataset`, where each individual data set (e.g., acquired data or synthetic data) is stored as a :class:`xarray.DataArray`. The module xarray is a soft dependency of emg3d, and has to be installed manually to use the survey functionality. Receivers have a switch ``relative``, which is False by default and means that the coordinates are absolute values (grid-based acquisition). If the switch is set to True, the coordinates are relative to the source. This can be used to model streamer-based acquisitions such as marine streamers or airborne surveys. The two acquisition types can also be mixed in a survey. .. note:: The package ``xarray`` has to be installed in order to use ``Survey``: ``pip install xarray`` or ``conda install -c conda-forge xarray``. Parameters ---------- sources, receivers : {Tx*, Rx*, list, dict} Any of the available sources or receivers, e.g., :class:`emg3d.electrodes.TxElectricDipole`, or a list or dict of Tx*/Rx* instances. If it is a dict, it is used as is, including the provided keys. In all other cases keys are assigned to the values. It can also be a list containing a combination of the above (lists, dicts, and instances). Receivers can be set to ``None``, if one is only interested in forward modelling the entire fields. In this case, the related data object and the noise floor and relative error have no meaning. Also, in conjunction with a :class:`emg3d.simulations.Simulation`, the misfit and the gradient will be zero. frequencies : {array_like, dict} Source frequencies (Hz). - array_like: Frequencies will be stored in a dict with keys assigned starting with ``'f-1'``, ``'f-2'``, and so on. - dict: Keys can be arbitrary names, values must be floats. data : ndarray, default: None The observed data (dtype=np.complex128); must have shape (nsrc, nrec, nfreq). Alternatively, it can be a dict containing many datasets, in which one could also store, for instance, standard-deviations for each source-receiver-frequency pair. If None, it will be initiated with NaN's. noise_floor, relative_error : {float, ndarray}, default: None Noise floor and relative error of the data. They can be arrays of a shape which can be broadcasted to the data shape, e.g., (nsrc, 1, 1) or (1, nrec, nfreq), or have the dimension of data. See :attr:`Survey.standard_deviation` for more info. name : str, default: None Name of the survey. date : str, default: None Acquisition date. info : str, default: None Survey info or any other info (e.g., what was the intent of the survey, what were the acquisition conditions, problems encountered). """ def __init__(self, sources, receivers, frequencies, data=None, **kwargs): """Initiate a new Survey instance.""" # Store sources, receivers, and frequencies. self._sources = txrx_lists_to_dict(sources) if receivers is None: self._receivers = {} else: self._receivers = txrx_lists_to_dict(receivers) self._frequencies = frequencies_to_dict(frequencies) # Initialize xarray dataset. self._initiate_dataset(data) # Get the optional keywords related to standard deviation. self.noise_floor = kwargs.pop('noise_floor', None) self.relative_error = kwargs.pop('relative_error', None) # Get the optional info. self.name = kwargs.pop('name', None) self.date = kwargs.pop('date', None) self.info = kwargs.pop('info', None) # Ensure no kwargs left. if kwargs: raise TypeError(f"Unexpected **kwargs: {list(kwargs.keys())}.") def __repr__(self): """Simple representation.""" name = f" «{self.name}»" if self.name else "" date = f" {self.date}" if self.date else "" info = f"{self.info}\n" if self.info else "" return (f":: {self.__class__.__name__}{name} ::{date}\n{info}\n" f"{self.data.__repr__()}") def _repr_html_(self): """HTML representation with fancy xarray display.""" name = f" «{self.name}»" if self.name else "" date = f" <tt>{self.date}</tt>" if self.date else "" info = f"{self.info}<br>" if self.info else "" return (f"<h3>{self.__class__.__name__}{name}{date}</h3>{info}" f"{self.data._repr_html_()}")
[docs] def copy(self): """Return a copy of the Survey.""" return self.from_dict(self.to_dict(True))
[docs] def to_dict(self, copy=False): """Store the necessary information of the Survey in a dict. Parameters ---------- copy : bool, default: False If True, returns a deep copy of the dict. Returns ------- out : dict Dictionary containing all information to re-create the Survey. """ out = { '__class__': self.__class__.__name__, 'sources': {k: v.to_dict() for k, v in self.sources.items()}, 'receivers': {k: v.to_dict() for k, v in self.receivers.items()}, 'frequencies': self.frequencies, 'data': {k: v.data for k, v in self.data.items()}, 'noise_floor': self.data.noise_floor, 'relative_error': self.data.relative_error, 'name': self.name, 'date': self.date, 'info': self.info, } if copy: return deepcopy(out) else: return out
[docs] @classmethod @utils._requires('xarray') def from_dict(cls, inp): """Convert dictionary into :class:`emg3d.surveys.Survey` instance. Parameters ---------- inp : dict Dictionary as obtained from :func:`emg3d.surveys.Survey.to_dict`. The dictionary needs the keys `sources`, `receivers`, and `frequencies`. Returns ------- survey : Survey A :class:`emg3d.surveys.Survey` instance. """ inp = {k: v for k, v in inp.items() if k != '__class__'} inp['sources'] = {k: getattr(electrodes, v['__class__']).from_dict(v) for k, v in inp['sources'].items()} inp['receivers'] = {k: getattr(electrodes, v['__class__']).from_dict(v) for k, v in inp['receivers'].items()} return cls(**inp)
[docs] def to_file(self, fname, name='survey', **kwargs): """Store Survey to a file. Parameters ---------- fname : str Absolute or relative file name including ending, which defines the used data format. See :func:`emg3d.io.save` for the options. name : str, default: 'survey' Name with which the survey is stored in the file. kwargs : Keyword arguments, optional Passed through to :func:`emg3d.io.save`. """ kwargs[name] = self # Add survey to dict. return io.save(fname, **kwargs)
[docs] @classmethod @utils._requires('xarray') def from_file(cls, fname, name='survey', **kwargs): """Load Survey from a file. Parameters ---------- fname : str Absolute or relative file name including extension. name : str, default: 'survey' Name under which the survey is stored within the file. kwargs : Keyword arguments, optional Passed through to :func:`io.load`. Returns ------- survey : Survey A :class:`emg3d.surveys.Survey` instance. info : str, returned if verb<0 Info-string. """ out = io.load(fname, **kwargs) if kwargs.get('verb', 0) < 0: return out[0][name], out[1] else: return out[name]
# DATA @utils._requires('xarray') def _initiate_dataset(self, data): """Initiate Dataset.""" # Get shape of DataArrays. shape = (len(self._sources), len(self._receivers), len(self._frequencies)) # Initialize data and ensure there is 'observed'. if data is None: data = {'observed': np.full(shape, np.nan+1j*np.nan)} elif not isinstance(data, dict): data = {'observed': np.atleast_3d(data)} elif 'observed' not in data.keys(): data['observed'] = np.full(shape, np.nan+1j*np.nan) # Create Dataset, add all data as DataArrays. dims = ('src', 'rec', 'freq') self._data = xarray.Dataset( {k: xarray.DataArray(v, dims=dims) for k, v in data.items()}, coords={'src': list(self.sources.keys()), 'rec': list(self.receivers.keys()), 'freq': list(self.frequencies)}, ) # Add attributes. self._data.src.attrs['Sources'] = "".join( f"{k}: {s.__repr__()};\n" for k, s in self.sources.items() )[:-2]+'.' self._data.rec.attrs['Receivers'] = "".join( f"{k}: {d.__repr__()};\n" for k, d in self.receivers.items() )[:-2]+'.' self._data.freq.attrs['Frequencies'] = "".join( f"{k}: {f} Hz;\n" for k, f in self.frequencies.items() )[:-2]+'.' @property def data(self): """Data, a :class:`xarray.Dataset` instance.""" return self._data
[docs] def select(self, sources=None, receivers=None, frequencies=None, remove_empty=True): """Return a Survey with selected sources, receivers, and frequencies. Parameters ---------- sources, receivers, frequencies : list, default: None Lists containing the wanted sources, receivers, and frequencies. If None, all are selected. remove_empty : bool, default: True If True, and self.data.observed has finite entries, it removes empty source-receiver-frequency entries and according sources, receivers, and frequencies. Returns ------- survey : Survey A :class:`emg3d.surveys.Survey` instance. """ # Get a dict of the survey survey = self.to_dict() selection = {} # Select sources. if sources is not None: if isinstance(sources, str): sources = [sources, ] survey['sources'] = {s: survey['sources'][s] for s in sources} selection['src'] = sources # Select receivers. if receivers is not None: if isinstance(receivers, str): receivers = [receivers, ] survey['receivers'] = { r: survey['receivers'][r] for r in receivers} selection['rec'] = receivers # Select frequencies. if frequencies is not None: if isinstance(frequencies, str): frequencies = [frequencies, ] survey['frequencies'] = { f: survey['frequencies'][f] for f in frequencies} selection['freq'] = frequencies # Replace data with selected data. for key in survey['data'].keys(): survey['data'][key] = self.data[key].sel(**selection) # Check if there are any finite observed data. if remove_empty and key == 'observed': data = survey['data'][key].data remove_empty = np.isfinite(data).any() # Create new, reduced survey. red_survey = Survey.from_dict(survey) # Remove empty source-receiver-frequency pairs. if remove_empty: def get_names(name, i0, i1, i2): """Return non-NaN names.""" ibool = np.isnan(data).all(axis=(i1, i2)) ind = np.arange(data.shape[i0])[~ibool] keys = survey[name].keys() return [n for i, n in enumerate(keys) if i in ind] # Get names. srcnames = get_names('sources', 0, 1, 2) recnames = get_names('receivers', 1, 0, 2) freqnames = get_names('frequencies', 2, 0, 1) # Use recursion to remove empty pairs. red_survey = red_survey.select( sources=srcnames, receivers=recnames, frequencies=freqnames, remove_empty=False, ) return red_survey
@property def shape(self): """Shape of data (nsrc, nrec, nfreq).""" return self.data.observed.shape @property def size(self): """Size of data (nsrc x nrec x nfreq).""" return int(self.data.observed.size) @property def count(self): """Count of observed data.""" return int(self.data.observed.count()) # SOURCES, RECEIVERS, FREQUENCIES @property def sources(self): """Source dict containing all sources.""" return self._sources @property def receivers(self): """Receiver dict containing all receivers.""" return self._receivers
[docs] def source_coordinates(self): """Return source center coordinates as ndarray [x, y, z].""" return np.array([s.center for s in self.sources.values()]).T
[docs] def receiver_coordinates(self, source=None): """Return receiver center coordinates as ndarray [x, y, z]. For relative receivers, all positions are listed one after the other for each source position. Alternatively, a source-name (string) can be provided, in which case only the position for this source is added. """ coords = [] # Loop over receivers. for v in self.receivers.values(): # If relative, loop over sources and add all positions. if v.relative and source is None: for s in self.sources.values(): coords.append(v.center_abs(s)) # If relative with a provided source, add this position. elif v.relative: coords.append(v.center_abs(self.sources[source])) # If absolute, add. else: coords.append(v.center) return np.array(coords).T
@property def frequencies(self): """Frequency dict containing all frequencies.""" return self._frequencies # STANDARD DEVIATION and NOISE @property def standard_deviation(self): r"""Return standard deviation of the data. The standard deviation can be set by providing an array of the same dimension as the data itself:: survey.standard_deviation = ndarray # (nsrc, nrec, nfreq) Alternatively, one can set the ``noise_floor`` :math:`\epsilon_\text{n}` and the ``relative_error`` :math:`\epsilon_\text{r}`:: survey.noise_floor = {float, ndarray} # (> 0 or None) survey.relative error = {float, ndarray} # (> 0 or None) They must be either floats, or three-dimensional arrays of shape ``({1;nsrc}, {1;nrec}, {1;nfreq})``; dimensions of one will be broadcasted to the corresponding size. E.g., for a dataset of arbitrary amount of sources and receivers with three frequencies you can define a purely frequency-dependent relative error via:: relative_error = np.array([err_f1, err_f2, err_f3])[None, None, :] The standard deviation :math:`\varsigma_i` of observation :math:`d_i` is then given in terms of the noise floor :math:`\epsilon_{\text{n};i}` and the relative error :math:`\epsilon_{\text{r};i}` by .. math:: :label: std \varsigma_i = \sqrt{\epsilon_{\text{n}; i}^2 + \left(\epsilon_{\text{r}; i}|d_i|\right)^2 } \, . Note that a set standard deviation is prioritized over potentially also defined noise floor and relative error. To use the noise floor and the relative error after defining standard deviation directly you would have to reset it like .. code-block:: python survey.standard_deviation = None after which Equation :eq:`std` would be used again. """ # If `standard_deviation` was set, return it. if 'standard_deviation' in self._data.keys(): return self.data['standard_deviation'] # Compute it if not already set. elif self.noise_floor is not None or self.relative_error is not None: # Initiate std (xarray of same type as the observed data) std = self.data.observed.copy(data=np.zeros(self.shape)) # Add noise floor if given. if self.noise_floor is not None: std += self.noise_floor**2 # Add relative error if given. if self.relative_error is not None: std += np.abs(self.relative_error*self.data.observed)**2 return np.sqrt(std) # If nothing is defined, return None else: return None @standard_deviation.setter def standard_deviation(self, standard_deviation): """Update standard deviation.""" # Update standard_deviation. if standard_deviation is not None: # Ensure all values are bigger than zero. if np.any(standard_deviation <= 0.0): raise ValueError( "All values of `standard_deviation` must be bigger " f"than zero. Provided: {standard_deviation}." ) self._data['standard_deviation'] = self.data.observed.copy( data=standard_deviation) # If None: assure no standard_deviation in data. elif 'standard_deviation' in self.data: del self._data['standard_deviation'] @property def noise_floor(self): r"""Return noise floor of the data. See :attr:`emg3d.surveys.Survey.standard_deviation` for more info. """ if isinstance(self.data.noise_floor, str): return self.data._noise_floor.data else: return self.data.noise_floor @noise_floor.setter def noise_floor(self, noise_floor): """Update noise floor.""" self._set_nf_re('noise_floor', noise_floor) @property def relative_error(self): r"""Return relative error of the data. See :attr:`emg3d.surveys.Survey.standard_deviation` for more info. """ if isinstance(self.data.relative_error, str): return self.data._relative_error.data else: return self.data.relative_error @relative_error.setter def relative_error(self, relative_error): """Update relative error.""" self._set_nf_re('relative_error', relative_error)
[docs] def add_noise(self, min_offset=0.0, min_amplitude='half_nf', add_to='observed', **kwargs): """Add random noise to the data defined by ``add_to``. The noise is generated with :func:`emg3d.surveys.random_noise`, consult that function to see how it is actually generated (``kwargs`` are passed through). This function takes further care of removing data which is too close to the source (``min_offset``) or has a too low signal (``min_amplitude``). Parameters ---------- min_offset : float, default: 0.0 Data points in ``data.observed`` where the offset < min_offset are set to NaN. min_amplitude : {float, str, None}, default: 'half_nf' Data points in ``data.observed`` where abs(data) < min_amplitude are set to NaN. If ``'half_nf'``, the ``noise_floor`` divided by two is used as ``min_amplitude``. Set to ``None`` to include all data. add_to : str, default: 'observed' Data to which to add the noise. By default it is added to the observed data. If a name is provided that is not an existing dataset it will create a dataset of zeroes. You can use that to obtain the pure noise. max_offset : float, default: np.infty Data points in ``data.observed`` where the offset > max_offset are set to NaN. """ # If a new data set is defined as output, initiate it. if add_to not in self.data.keys(): self.data[add_to] = self.data.observed.copy( data=np.zeros(self.shape, dtype=complex)) # Set data below minimum amplitude to NaN. if min_amplitude == 'half_nf': min_amplitude = self.noise_floor if min_amplitude is not None: min_amplitude /= 2.0 if min_amplitude is not None: cut_amp = abs(self.data.observed.data) < min_amplitude self.data[add_to].data[cut_amp] = np.nan + 1j*np.nan # Set offsets below min_offset and above max_offset to NaN. max_offset = kwargs.pop('max_offset', np.infty) if min_offset > 0.0 or max_offset < np.infty: for ks, s in self.sources.items(): for kr, r in self.receivers.items(): off = np.linalg.norm(r.center_abs(s) - s.center) if off < min_offset or off > max_offset: self.data[add_to].loc[ks, kr, :] = np.nan + 1j*np.nan # Add noise if noise_floor and/or relative_error given. if self.standard_deviation is not None: noise = random_noise(self.standard_deviation.data, **kwargs) self.data[add_to].data += noise
def _set_nf_re(self, name, value): """Update noise_floor or relative_error.""" if value is not None and not isinstance(value, str): # Cast value = np.asarray(value) # Ensure all values are bigger than zero. if np.any(value <= 0.0): raise ValueError( f"All values of `{name}` must be bigger than zero. " f"Provided: {value}." ) # If one value it is stored as attribute. if value.size == 1: value = float(value) # If more than one value it is stored as data array; # broadcasting it if necessary. else: self.data['_'+name] = self.data.observed.copy( data=np.ones(self.shape)*value) value = 'data._'+name # str-flag on attrs. self._data.attrs[name] = value @property def _irec_types(self): """Return receiver types if electric (True) or magnetic (False).""" # Create tuples if they do not exist yet. if getattr(self, '_ierec', None) is None: rec_types = tuple( [r.xtype == 'electric' for r in self.receivers.values()]) # Store independently for electric and magnetic. self._ierec = np.nonzero(rec_types)[0] self._imrec = np.nonzero(np.logical_not(rec_types))[0] return self._ierec, self._imrec def _rec_types_coord(self, source): """Return absolute receiver coordinates as function of source.""" # Create dict to store them. if getattr(self, '_rec_coord', None) is None: self._rec_coord = {} # Absolute coordinates are stored in a source-dict. if source not in self._rec_coord.keys(): self._rec_coord[source] = np.array( [r.coordinates_abs(self.sources[source]) for r in self.receivers.values()] ) # Return per receiver type (erec, mrec). indices = self._irec_types return [tuple(self._rec_coord[source][ind].T) for ind in indices]
[docs]def random_noise(standard_deviation, mean_noise=0.0, ntype='white_noise'): r"""Return random noise for given inputs. Different methods are implemented to create random noise for frequency-domain CSEM data. All methods generate random noise in the following way .. math:: d^\text{noise} = \varsigma \left[(1 + \text{i})\,u + \mathcal{R} \right] \, . where :math:`\varsigma` is the standard deviation (see :attr:`emg3d.surveys.Survey.standard_deviation`), :math:`u` is the mean value of the randomly distributed noise, and :math:`\mathcal{R}` are the random realizations of the noise. Currently there are three methods (``ntype``) implemented. 1. ``white_noise`` Random uniform phases with constant amplitudes. This is the default implementation, and corresponds to white noise in the time-domain: a flat amplitude spectrum for all frequencies, with random phases: .. math:: \mathcal{R}_\text{wn} = \exp[\text{i}\,\mathcal{U}(0, 2\pi)] \, , where :math:`\mathcal{U}(0, 2\pi)` is the uniform distribution and its range. 2. Random Gaussian noise. In the following, :math:`\mathcal{N}(0, 1)` is the standard normal distribution of zero mean and unit standard deviation. a. ``gaussian_correlated`` Same realization added to real and imaginary part. .. math:: \mathcal{R}_\text{gc} = (1+\text{i})\,\mathcal{N}(0, 1) \, . b. ``gaussian_uncorrelated`` Independent realizations added to real and imaginary part. .. math:: \mathcal{R}_\text{gu} = \mathcal{N}(0, 1) + \text{i}\,\mathcal{N}(0, 1) \, . There are, of course, other possibilities. One could, e.g., make the non-zero mean itself random. See the example `random_noise_f_domain.html <https://empymod.emsig.xyz/en/latest/gallery/educational/random_noise_f_domain.html>`_ for more details about random noise in the frequency domain. Parameters ---------- standard_deviation : ndarray Standard deviations of the data. mean_noise : float, default: 0.0 Mean value of the random noise (as fraction of standard_deviation). ntype : str, default: white_noise What type of noise. Options: - ``'white_noise'``: random uniform phases with constant amplitude. - ``'gaussian_correlated'``: Same Gaussian random realizations added to Real and Imaginary part. - ``'gaussian_uncorrelated'``: Independent Gaussian random realizations added to Real and Imaginary part. Returns ------- noise : ndarray Noise, a complex-valued ndarray of the same shape as standard_deviation. """ shape = standard_deviation.shape # Initiate Random Generator. rng = np.random.default_rng() # Random Gaussian noise independently for Real and Imaginary part. if ntype == 'gaussian_uncorrelated': noise = rng.standard_normal(shape) + 1j*rng.standard_normal(shape) # Random Gaussian noise; same for Real and Imaginary part. elif ntype == 'gaussian_correlated': noise = rng.standard_normal(shape)*(1+1j) # Random Uniform phases with constant amplitude (white noise); default. else: noise = np.exp(1j * rng.uniform(0, 2*np.pi, shape)) # Return noise. return standard_deviation * ((1+1j)*mean_noise + noise)
[docs]def txrx_coordinates_to_dict(TxRx, coordinates, **kwargs): """Create dict of TxRx instances with provided coordinates. Source and receiver dictionaries to input into a :class:`emg3d.surveys.Survey` can be created in many ways. This is a helper function to create a dict from a tuple of coordinates. Parameters ---------- TxRx : {Tx*, Rx*) Any of the available sources or receivers, e.g., :class:`emg3d.electrodes.TxElectricDipole`. coordinates : tuple Tuple containing the input coordinates for the defined TxRx class. Each element of the tuple must either have length ``1`` or ``n``. **kwargs : Other parameters passed through to TxRx; again, each must be of size ``1`` or ``n``. Returns ------- out : dict Dict where the keys consist of a TxRx-prefix followed by a number, and the values contain the corresponding TxRx instances. Examples -------- .. ipython:: In [1]: import emg3d ...: import numpy as np In [2]: # Create 10 electric dipole sources from x=2000:2000:10,000, of ...: # strength 100 A. ...: offsets = np.arange(1, 6)*2000 ...: sources = emg3d.surveys.txrx_coordinates_to_dict( ...: emg3d.TxElectricDipole, ...: (offsets, 0, 0, 0, 0), strength=100) ...: sources # QC the source dict """ # Get max dimension. nd = max([np.array(n, ndmin=1).size for n in coordinates]) # Expand coordinates. coo = np.array([nd*[val, ] if np.array(val).size == 1 else val for val in coordinates], dtype=np.float64) # Expand kwargs. inp = {} for i in range(nd): inp[i] = {} for k, v in kwargs.items(): inp[i][k] = v if np.array(v).size == 1 else v[i] # Return TxRx-dict. return txrx_lists_to_dict([TxRx(coo[:, i], **inp[i]) for i in range(nd)])
[docs]def txrx_lists_to_dict(txrx): """Create dict from provided list of Tx/Rx instances. Source and receiver dictionaries to input into a :class:`emg3d.surveys.Survey` can be created in many ways. This is a helper function to create a dict from a list of source or receiver instances, or from a list of lists and dicts of source or receiver instances. Parameters ---------- txrx : {Tx*, Rx*, list, dict) Any of the available sources or receivers, e.g., :class:`emg3d.electrodes.TxElectricDipole`, or a list or dict of Tx*/Rx* instances. If it is a dict, it is returned unaltered. It can also be a list containing a combination of the above (lists, dicts, and instances). Returns ------- out : dict Dict where the keys consist of a TxRx-specific prefix followed by a number, and the values contain the corresponding TxRx instances. Examples -------- .. ipython:: In [1]: import emg3d ...: import numpy as np In [2]: # Create two electric, fixed receivers. ...: electric = [emg3d.RxElectricPoint((x, 0, 0, 0, 0)) ...: for x in [1000, 1100]] In [3]: # Create three magnetic, fixed receivers. ...: magnetic = emg3d.surveys.txrx_coordinates_to_dict( ...: emg3d.RxMagneticPoint, ...: ([950, 1050, 1150], 0, 0, 0, 90)) In [4]: # Create a streamer receiver, flying 5 m behind the source. ...: streamer = emg3d.RxElectricPoint((5, 0, 0, 0, 0), relative=True) In [5]: # Collect all receivers. ...: receivers = emg3d.surveys.txrx_lists_to_dict( ...: [[streamer, ], electric, magnetic]) ...: receivers # QC our collected receivers """ # If input is a dict, return it unaltered. if isinstance(txrx, dict): return txrx # A single Tx*/Rx* instance. elif hasattr(txrx, '_prefix'): txrx = [txrx, ] # If it is a list and contains other lists or dicts, collect them. elif any(isinstance(el, (list, tuple, dict)) for el in txrx): # Add all lists and dicts to new list. new_txrx = list() for trx in txrx: # A single Tx*/Rx* instance. if hasattr(trx, '_prefix'): trx = [trx, ] # If dict, cast it to list. elif isinstance(trx, dict): trx = list(trx.values()) new_txrx += trx # Overwrite original list with new flat list. txrx = new_txrx # else, it has to be a list/tuple of Tx/Rx instances. # Return TxRx-dict. nx = len(txrx) return {f"{trx._prefix}-{i+1:0{len(str(nx))}d}": trx for i, trx in enumerate(txrx)}
[docs]def frequencies_to_dict(frequencies): """Create dict from provided frequencies. Parameters ---------- frequencies : {array_like, dict} Source frequencies (Hz). - array_like: Frequencies will be stored in a dict with keys assigned starting with ``'f-1'``, ``'f-2'``, and so on. - dict: returned unaltered. Returns ------- out : dict Frequency dict. """ if not isinstance(frequencies, dict): # Cast. freqs = np.array(frequencies, dtype=np.float64, ndmin=1) # Ensure frequencies are unique. if freqs.size != np.unique(freqs).size: raise ValueError(f"Contains non-unique frequencies: {freqs}.") # Store in dict. frequencies = {f"f-{i+1:0{len(str(freqs.size))}d}": freq for i, freq in enumerate(freqs)} return frequencies