"""
Interpolation routines mapping grids to grids, grids to fields, and fields to
grids.
"""
# Copyright 2018-2020 The emg3d Developers.
#
# This file is part of emg3d.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import numba as nb
import numpy as np
from scipy import interpolate, ndimage
__all__ = ['grid2grid', 'interp3d', 'MapConductivity', 'MapLgConductivity',
'MapLnConductivity', 'MapResistivity', 'MapLgResistivity',
'MapLnResistivity', 'edges2cellaverages']
# Numba-settings
_numba_setting = {'nogil': True, 'fastmath': True, 'cache': True}
# INTERPOLATIONS
[docs]def grid2grid(grid, values, new_grid, method='linear', extrapolate=True,
log=False):
"""Interpolate `values` located on `grid` to `new_grid`.
**Note 1:**
The default method is 'linear', because it works with fields and model
parameters. However, recommended are 'volume' for model parameters and
'cubic' for fields.
**Note 2:**
For model parameters with `method='volume'` the result is quite different
if you provide resistivity, conductivity, or the logarithm of any of the
two. The recommended way is to provide the logarithm of resistivity or
conductivity, in which case the output of one is indeed the inverse of the
output of the other.
Parameters
----------
grid, new_grid : TensorMesh
Input and output model grids;
:class:`TensorMesh` instances.
values : ndarray
Model parameters; :class:`emg3d.fields.Field` instance, or a particular
field (e.g. field.fx). For fields the method cannot be 'volume'.
method : {<'linear'>, 'volume', 'cubic'}, optional
The method of interpolation to perform. The volume averaging method
ensures that the total sum of the property stays constant.
Volume averaging is only implemented for model parameters, not for
fields. The method 'cubic' requires at least three points in any
direction, otherwise it will fall back to 'linear'.
Default is 'linear', because it works with fields and model parameters.
However, recommended are 'volume' for model parameters and 'cubic' for
fields.
extrapolate : bool
If True, points on `new_grid` which are outside of `grid` are filled by
the nearest value (if ``method='cubic'``) or by extrapolation (if
``method='linear'``). If False, points outside are set to zero.
For ``method='volume'`` it always uses the nearest value for points
outside of `grid`.
Default is True.
log : bool
If True, the interpolation is carried out on a log10-scale; hence the
same as ``10**grid2grid(grid, np.log10(values), ...)``.
Default is False.
Returns
-------
new_values : ndarray
Values corresponding to `new_grid`.
See Also
--------
get_receiver : Interpolation of model parameters or fields to (x, y, z).
"""
# If values is a Field instance, call it recursively for each field.
if hasattr(values, 'field') and values.field.ndim == 1:
fx = grid2grid(grid, np.asarray(values.fx), new_grid, method,
extrapolate, log)
fy = grid2grid(grid, np.asarray(values.fy), new_grid, method,
extrapolate, log)
fz = grid2grid(grid, np.asarray(values.fz), new_grid, method,
extrapolate, log)
# Return a field instance.
return values.__class__(fx, fy, fz)
# If values is a particular field, ensure method is not 'volume'.
if not np.all(grid.vnC == values.shape) and method == 'volume':
raise ValueError("``method='volume'`` not implemented for fields.")
if method == 'volume':
points = (grid.vectorNx, grid.vectorNy, grid.vectorNz)
new_points = (new_grid.vectorNx, new_grid.vectorNy, new_grid.vectorNz)
new_values = np.zeros(new_grid.vnC, dtype=values.dtype)
vol = new_grid.vol.reshape(new_grid.vnC, order='F')
# Get values from `volume_average`.
if log:
volume_average(*points, np.log10(values), *new_points,
new_values, vol)
else:
volume_average(*points, values, *new_points, new_values, vol)
else:
# Get the vectors corresponding to input data.
points = tuple()
new_points = tuple()
shape = tuple()
for i, coord in enumerate(['x', 'y', 'z']):
if values.shape[i] == getattr(grid, 'nN'+coord):
pts = getattr(grid, 'vectorN'+coord)
new_pts = getattr(new_grid, 'vectorN'+coord)
else:
pts = getattr(grid, 'vectorCC'+coord)
new_pts = getattr(new_grid, 'vectorCC'+coord)
# Add to points.
points += (pts, )
new_points += (new_pts, )
shape += (len(new_pts), )
# Format the output points.
xx, yy, zz = np.broadcast_arrays(
new_points[0][:, None, None],
new_points[1][:, None],
new_points[2])
new_points = np.r_[xx.ravel('F'), yy.ravel('F'), zz.ravel('F')]
new_points = new_points.reshape(-1, 3, order='F')
# Get values from `interp3d`.
if extrapolate:
fill_value = None
mode = 'nearest'
else:
fill_value = 0.0
mode = 'constant'
if log:
new_values = interp3d(points, np.log10(values), new_points,
method, fill_value, mode)
else:
new_values = interp3d(points, values, new_points, method,
fill_value, mode)
new_values = new_values.reshape(shape, order='F')
if log:
return 10**new_values
else:
return new_values
[docs]def interp3d(points, values, new_points, method, fill_value, mode):
"""Interpolate values in 3D either linearly or with a cubic spline.
Return `values` corresponding to a regular 3D grid defined by `points` on
`new_points`.
This is a modified version of :func:`scipy.interpolate.interpn`, using
:class:`scipy.interpolate.RegularGridInterpolator` if ``method='linear'``
and a custom-wrapped version of :func:`scipy.ndimage.map_coordinates` if
``method='cubic'``. If speed is important then choose 'linear', as it can
be significantly faster.
Parameters
----------
points : tuple of ndarray of float, with shapes ((nx, ), (ny, ) (nz, ))
The points defining the regular grid in three dimensions.
values : array_like, shape (nx, ny, nz)
The data on the regular grid in three dimensions.
new_points : tuple (rec_x, rec_y, rec_z)
Coordinates (x, y, z) of new points.
method : {'cubic', 'linear'}, optional
The method of interpolation to perform, 'linear' or 'cubic'. Default is
'cubic' (forced to 'linear' if there are less than 3 points in any
direction).
fill_value : float or None
Passed to :class:`scipy.interpolate.RegularGridInterpolator` if
``method='linear'``: The value to use for points outside of the
interpolation domain. If None, values outside the domain are
extrapolated.
mode : {'constant', 'nearest', 'mirror', 'reflect', 'wrap'}
Passed to :func:`scipy.ndimage.map_coordinates` if ``method='cubic'``:
Determines how the input array is extended beyond its boundaries.
Returns
-------
new_values : ndarray
Values corresponding to `new_points`.
"""
# We need at least 3 points in each direction for cubic spline. This should
# never be an issue for a realistic 3D model.
for pts in points:
if len(pts) < 4:
method = 'linear'
# Interpolation.
if method == "linear":
ifn = interpolate.RegularGridInterpolator(
points=points, values=values, method="linear",
bounds_error=False, fill_value=fill_value)
new_values = ifn(xi=new_points)
else:
# Replicate the same expansion of xi as used in
# RegularGridInterpolator, so the input xi can be quite flexible.
xi = interpolate.interpnd._ndim_coords_from_arrays(new_points, ndim=3)
xi_shape = xi.shape
xi = xi.reshape(-1, 3)
# map_coordinates uses the indices of the input data (values in this
# case) as coordinates. We have therefore to transform our desired
# output coordinates to this artificial coordinate system too.
coords = np.empty(xi.T.shape)
for i in range(3):
coords[i] = interpolate.interp1d(
points[i], np.arange(len(points[i])), kind='cubic',
bounds_error=False, fill_value='extrapolate',)(xi[:, i])
# map_coordinates only works for real data; split it up if complex.
params3d = {'order': 3, 'mode': mode, 'cval': 0.0}
if 'complex' in values.dtype.name:
real = ndimage.map_coordinates(values.real, coords, **params3d)
imag = ndimage.map_coordinates(values.imag, coords, **params3d)
result = real + 1j*imag
else:
result = ndimage.map_coordinates(values, coords, **params3d)
new_values = result.reshape(xi_shape[:-1])
return new_values
# MAPS
MAPLIST = {}
def register_map(func):
MAPLIST[func.__name__] = func
return func
class _Map:
"""Maps variable `x` to computational variable `σ` (conductivity)."""
def __init__(self, description):
"""Initiate the map."""
self.name = self.__class__.__name__[3:]
self.description = description
def __repr__(self):
return (f"{self.__class__.__name__}: {self.description}\n"
" Maps investigation variable `x` to\n"
" computational variable `σ` (conductivity).")
def forward(self, conductivity):
"""Conductivity to mapping."""
raise NotImplementedError("Forward map not implemented.")
def backward(self, mapped):
"""Mapping to conductivity."""
raise NotImplementedError("Backward map not implemented.")
def derivative_chain(self, gradient, mapped):
"""Chain rule to map gradient from conductivity to mapping space."""
raise NotImplementedError("Derivative chain not implemented.")
def to_dict(self):
"""Store the map name in a dict for serialization."""
return {'name': self.name, '__class__': '_Map'}
@classmethod
def from_dict(cls, inp):
"""Get :class:`_Map` instance from name in dict."""
return MAPLIST['Map'+inp['name']]()
[docs]@register_map
class MapConductivity(_Map):
"""Maps `σ` to computational variable `σ` (conductivity).
- forward: x = σ
- backward: σ = x
"""
def __init__(self):
super().__init__('conductivity')
[docs] def forward(self, conductivity):
return conductivity
[docs] def backward(self, mapped):
return mapped
[docs] def derivative_chain(self, gradient, mapped):
pass
[docs]@register_map
class MapLgConductivity(_Map):
"""Maps `log_10(σ)` to computational variable `σ` (conductivity).
- forward: x = log_10(σ)
- backward: σ = 10^x
"""
def __init__(self):
super().__init__('log_10(conductivity)')
[docs] def forward(self, conductivity):
return np.log10(conductivity)
[docs] def backward(self, mapped):
return 10**mapped
[docs] def derivative_chain(self, gradient, mapped):
gradient *= self.backward(mapped)*np.log(10)
[docs]@register_map
class MapLnConductivity(_Map):
"""Maps `log_e(σ)` to computational variable `σ` (conductivity).
- forward: x = log_e(σ)
- backward: σ = exp(x)
"""
def __init__(self):
super().__init__('log_e(conductivity)')
[docs] def forward(self, conductivity):
return np.log(conductivity)
[docs] def backward(self, mapped):
return np.exp(mapped)
[docs] def derivative_chain(self, gradient, mapped):
gradient *= self.backward(mapped)
[docs]@register_map
class MapResistivity(_Map):
"""Maps `ρ` to computational variable `σ` (conductivity).
- forward: x = ρ = σ^-1
- backward: σ = ρ^-1 = x^-1
"""
def __init__(self):
super().__init__('resistivity')
[docs] def forward(self, conductivity):
return 1.0/conductivity
[docs] def backward(self, mapped):
return 1.0/mapped
[docs] def derivative_chain(self, gradient, mapped):
gradient *= -self.backward(mapped)**2
[docs]@register_map
class MapLgResistivity(_Map):
"""Maps `log_10(ρ)` to computational variable `σ` (conductivity).
- forward: x = log_10(ρ) = log_10(σ^-1)
- backward: σ = ρ^-1 = 10^-x
"""
def __init__(self):
super().__init__('log_10(resistivity)')
[docs] def forward(self, conductivity):
return np.log10(1.0/conductivity)
[docs] def backward(self, mapped):
return 10**-mapped
[docs] def derivative_chain(self, gradient, mapped):
gradient *= -self.backward(mapped)*np.log(10)
[docs]@register_map
class MapLnResistivity(_Map):
"""Maps `log_e(ρ)` to computational variable `σ` (conductivity).
- forward: x = log_e(ρ) = log_e(σ^-1)
- backward: σ = ρ^-1 = exp(-x)
"""
def __init__(self):
super().__init__('log_e(resistivity)')
[docs] def forward(self, conductivity):
return np.log(1.0/conductivity)
[docs] def backward(self, mapped):
return np.exp(-mapped)
[docs] def derivative_chain(self, gradient, mapped):
gradient *= -self.backward(mapped)
# VOLUME AVERAGING
@nb.njit(**_numba_setting)
def volume_average(edges_x, edges_y, edges_z, values,
new_edges_x, new_edges_y, new_edges_z, new_values, new_vol):
"""Interpolation using the volume averaging technique.
The result is added to new_values.
The original implementation (see ``emg3d v0.7.1``) followed [PlDM07]_. Joe
Capriot took that algorithm and made it much faster for implementation in
`discretize`. The current implementation is a simplified Numba-version of
his Cython version (the `discretize` version works for 1D, 2D, and 3D
meshes and can also return a sparse matrix representing the operation).
Parameters
----------
edges_[x, y, z] : ndarray
The edges in x-, y-, and z-directions for the original grid.
values : ndarray
Values corresponding to `grid`.
new_edges_[x, y, z] : ndarray
The edges in x-, y-, and z-directions for the new grid.
new_values : ndarray
Array where values corresponding to `new_grid` will be added.
new_vol : ndarray
The volumes of the `new_grid`-cells.
"""
# Get the weights and indices for each direction.
wx, ix_in, ix_out = _volume_average_weights(edges_x, new_edges_x)
wy, iy_in, iy_out = _volume_average_weights(edges_y, new_edges_y)
wz, iz_in, iz_out = _volume_average_weights(edges_z, new_edges_z)
# Loop over the elements and sum up the contributions.
for iz, w_z in enumerate(wz):
izi = iz_in[iz]
izo = iz_out[iz]
for iy, w_y in enumerate(wy):
iyi = iy_in[iy]
iyo = iy_out[iy]
w_zy = w_z*w_y
for ix, w_x in enumerate(wx):
ixi = ix_in[ix]
ixo = ix_out[ix]
new_values[ixo, iyo, izo] += w_zy*w_x*values[ixi, iyi, izi]
# Normalize by new volume.
new_values /= new_vol
@nb.njit(**_numba_setting)
def _volume_average_weights(x1, x2):
"""Returaveragethe weights for the volume averaging technique.
Parameters
----------
x1, x2 : ndarray
The edges in x-, y-, or z-directions for the original (x1) and the new
(x2) grids.
Returns
-------
hs : ndarray
Weights for the mapping of x1 to x2.
ix1, ix2 : ndarray
Indices to map x1 to x2.
"""
# Fill xs with uniques and truncate.
# Corresponds to np.unique(np.concatenate([x1, x2])).
n1, n2 = len(x1), len(x2)
xs = np.empty(n1 + n2) # Pre-allocate array containing all edges.
i1, i2, i = 0, 0, 0
while i1 < n1 or i2 < n2:
if i1 < n1 and i2 < n2:
if x1[i1] < x2[i2]:
xs[i] = x1[i1]
i1 += 1
elif x1[i1] > x2[i2]:
xs[i] = x2[i2]
i2 += 1
else:
xs[i] = x1[i1]
i1 += 1
i2 += 1
elif i1 < n1 and i2 == n2:
xs[i] = x1[i1]
i1 += 1
elif i2 < n2 and i1 == n1:
xs[i] = x2[i2]
i2 += 1
i += 1
# Get weights and indices for the two arrays.
# - hs corresponds to np.diff(xs) where x1 and x2 overlap; zero outside.
# - x1[ix1] can be mapped to x2[ix2] with the corresponding weight.
nh = i-1
hs = np.empty(nh) # Pre-allocate weights.
ix1 = np.zeros(nh, dtype=np.int32) # Pre-allocate indices for x1.
ix2 = np.zeros(nh, dtype=np.int32) # Pre-allocate indices for x2.
center = 0.0
i1, i2, i, ii = 0, 0, 0, 0
for i in range(nh):
center = 0.5*(xs[i]+xs[i+1])
if x2[0] <= center and center <= x2[n2-1]:
hs[ii] = xs[i+1]-xs[i]
while i1 < n1-1 and center >= x1[i1]:
i1 += 1
while i2 < n2-1 and center >= x2[i2]:
i2 += 1
ix1[ii] = min(max(i1-1, 0), n1-1)
ix2[ii] = min(max(i2-1, 0), n2-1)
ii += 1
return hs[:ii], ix1[:ii], ix2[:ii]
# EDGES => CENTERS
[docs]@nb.njit(**_numba_setting)
def edges2cellaverages(ex, ey, ez, vol, out_x, out_y, out_z):
r"""Interpolate fields defined on edges to volume-averaged cell values.
Parameters
----------
ex, ey, ez : ndarray
Electric fields in x-, y-, and z-directions, as obtained from
:class:`emg3d.fields.Field`.
vol : ndarray
Volumes of the grid, as obtained from :class:`emg3d.meshes.TensorMesh`.
out_x, out_y, out_z : ndarray
Arrays where the results are placed (per direction).
"""
# Get dimensions
nx, ny, nz = vol.shape
# Loop over dimensions.
for iz in range(nz+1):
izm = max(0, iz-1)
izp = min(nz-1, iz)
for iy in range(ny+1):
iym = max(0, iy-1)
iyp = min(ny-1, iy)
for ix in range(nx+1):
ixm = max(0, ix-1)
ixp = min(nx-1, ix)
# Multiply field by volume/4.
if ix < nx:
out_x[ix, iym, izm] += vol[ix, iym, izm]*ex[ix, iy, iz]/4
out_x[ix, iyp, izm] += vol[ix, iyp, izm]*ex[ix, iy, iz]/4
out_x[ix, iym, izp] += vol[ix, iym, izp]*ex[ix, iy, iz]/4
out_x[ix, iyp, izp] += vol[ix, iyp, izp]*ex[ix, iy, iz]/4
if iy < ny:
out_y[ixm, iy, izm] += vol[ixm, iy, izm]*ey[ix, iy, iz]/4
out_y[ixp, iy, izm] += vol[ixp, iy, izm]*ey[ix, iy, iz]/4
out_y[ixm, iy, izp] += vol[ixm, iy, izp]*ey[ix, iy, iz]/4
out_y[ixp, iy, izp] += vol[ixp, iy, izp]*ey[ix, iy, iz]/4
if iz < nz:
out_z[ixm, iym, iz] += vol[ixm, iym, iz]*ez[ix, iy, iz]/4
out_z[ixp, iym, iz] += vol[ixp, iym, iz]*ez[ix, iy, iz]/4
out_z[ixm, iyp, iz] += vol[ixm, iyp, iz]*ez[ix, iy, iz]/4
out_z[ixp, iyp, iz] += vol[ixp, iyp, iz]*ez[ix, iy, iz]/4