Source code for shenfun.utilities

"""
Module for implementing helper functions.
"""
from numbers import Number
try:
    from collections.abc import MutableMapping
except ImportError:
    from collections import MutableMapping
from collections import defaultdict
import numpy as np
import sympy as sp
from scipy.fftpack import dct
from scipy.integrate import quad
from shenfun.optimization import runtimeoptimizer, cython
from shenfun.config import config
from .findbasis import get_bc_basis, get_stencil_matrix, n

__all__ = ['dx', 'clenshaw_curtis1D', 'CachedArrayDict', 'surf3D',
           'wrap_periodic', 'outer', 'dot', 'apply_mask', 'integrate_sympy',
           'mayavi_show', 'quiver3D', 'get_bc_basis', 'get_stencil_matrix',
           'scalar_product', 'n', 'cross', 'reset_profile', 'Lambda']

Lambda = getattr(cython, 'Lambda', None)

[docs] def dx(u, weighted=False): r"""Compute integral of u over domain .. math:: \int_{\Omega} u dx Parameters ---------- u : Array The Array to integrate Note ---- This function assumes a standard reference domain. If the domain is not standard, then modify the result accordingly. """ T = u.function_space() dim = len(u.shape) if dim == 1: w = T.points_and_weights(weighted=weighted)[1] return np.sum(u*w).item() uc = u.copy() for ax in range(dim): uc = uc.redistribute(axis=ax) w = T.bases[ax].points_and_weights(weighted=weighted)[1] sl = [np.newaxis]*len(uc.shape) sl[ax] = slice(None) uu = np.sum(uc*w[tuple(sl)], axis=ax) sl = [slice(None)]*len(uc.shape) sl[ax] = np.newaxis uc[:] = uu[tuple(sl)] return uc.flat[0]
[docs] def clenshaw_curtis1D(u, quad="GC"): # pragma: no cover """Clenshaw-Curtis integration in 1D""" assert u.ndim == 1 N = u.shape[0] if quad == 'GL': w = np.arange(0, N, 1, dtype=float) w[2:] = 2./(1-w[2:]**2) w[0] = 1 w[1::2] = 0 ak = dct(u, 1) ak /= (N-1) return np.sqrt(np.sum(ak*w)) assert quad == 'GC' d = np.zeros(N) k = 2*(1 + np.arange((N-1)//2)) d[::2] = (2./N)/np.hstack((1., 1.-k*k)) w = dct(d, type=3) return np.sqrt(np.sum(u*w))
[docs] class CachedArrayDict(MutableMapping): """Dictionary for caching Numpy arrays (work arrays) Example ------- >>> import numpy as np >>> from shenfun.utilities import CachedArrayDict >>> work = CachedArrayDict() >>> a = np.ones((3, 4), dtype=int) >>> w = work[(a, 0, True)] # create work array with shape as a >>> print(w.shape) (3, 4) >>> print(w) [[0 0 0 0] [0 0 0 0] [0 0 0 0]] >>> w2 = work[(a, 1, True)] # Get different(note 1!) array of same shape/dtype """ def __init__(self): self._data = {} def __getitem__(self, key): newkey, fill = self.__keytransform__(key) try: value = self._data[newkey] except KeyError: shape, dtype, _ = newkey value = np.empty(shape, dtype=np.dtype(dtype, align=True)) self._data[newkey] = value if fill: value.fill(0) return value @staticmethod def __keytransform__(key): assert len(key) == 3 return (key[0].shape, key[0].dtype, key[1]), key[2] def __len__(self): return len(self._data) def __setitem__(self, key, value): self._data[self.__keytransform__(key)[0]] = value def __delitem__(self, key): del self._data[self.__keytransform__(key)[0]] def __iter__(self): return iter(self._data)
[docs] def values(self): raise TypeError('Cached work arrays not iterable')
[docs] def reset_profile(prof): """Reset profiler for kernprof Parameters ---------- prof : The profiler """ prof.code_map = {} prof.last_time = {} prof.enable_count = 0 for func in prof.functions: prof.add_function(func)
[docs] def cross(c, a, b): """Cross product c = a x b Parameters ---------- c : Array a : Array b : Array Returns ------- c : Array """ if a.ndim == 3: cross2D(c, a, b) elif a.ndim == 4: cross3D(c, a, b) else: crossND(c, a, b) return c
@runtimeoptimizer def cross2D(c, a, b): c[:] = a[0]*b[1]-a[1]*b[0] @runtimeoptimizer def cross3D(c, a, b): c[0] = a[1]*b[2] - a[2]*b[1] c[1] = a[2]*b[0] - a[0]*b[2] c[2] = a[0]*b[1] - a[1]*b[0] @runtimeoptimizer def crossND(c, a, b): cross3D(c, a, b)
[docs] def outer(a, b, c): r"""Return outer product $c_{i,j} = a_i b_j$ Parameters ---------- a : Array of shape (N, ...) b : Array of shape (N, ...) c : Array of shape (N*N, ...) The outer product is taken over the first index of a and b, for all remaining indices. """ av = a.v bv = b.v cv = c.v symmetric = a is b if av.shape[0] == 2: outer2D(av, bv, cv, symmetric) elif av.shape[0] == 3: outer3D(av, bv, cv, symmetric) return c
@runtimeoptimizer def outer2D(a, b, c, symmetric): c[0] = a[0]*b[0] c[1] = a[0]*b[1] if symmetric: c[2] = c[1] else: c[2] = a[1]*b[0] c[3] = a[1]*b[1] @runtimeoptimizer def outer3D(a, b, c, symmetric): c[0] = a[0]*b[0] c[1] = a[0]*b[1] c[2] = a[0]*b[2] if symmetric: c[3] = c[1] c[6] = c[2] c[7] = c[5] else: c[3] = a[1]*b[0] c[6] = a[2]*b[0] c[7] = a[2]*b[1] c[4] = a[1]*b[1] c[5] = a[1]*b[2] c[8] = a[2]*b[2]
[docs] def dot(u, v, output_array=None, forward_output=True): """Return dot product of u and v Parameters ---------- u : Function or Array v : Function or Array output_array : Function or Array Must be of correct shape forward_output : bool, optional Return the result as a Function if True, and an Array if False. Note ---- This function uses the 3/2-rule for dealiasing all spaces if the input arrays are spectral Functions. The returned Function is projected to an orthogonal space, without boundary conditions. """ from shenfun import Function, Array assert isinstance(u, (Function, Array)) assert isinstance(v, (Function, Array)) Vu = u.function_space() Vv = v.function_space() assert Vu.dimensions == Vv.dimensions D = Vu.dimensions Vup = Vu Vvp = Vv # Get the correct output_array # Logic is to first follow the output_array if given, # use input_arrays if not uv = output_array if isinstance(output_array, Function): forward_output = True Top = output_array.function_space() uv = Array(Top) elif isinstance(output_array, Array): forward_output = False Top = output_array.function_space() uv = output_array else: output_rank = Vu.tensor_rank+Vv.tensor_rank-2 if output_rank == 0: To = Vu[0].get_orthogonal() elif output_rank == 1: if Vu.tensor_rank == 1: To = Vu.get_orthogonal() else: To = Vu[0].get_orthogonal() elif output_rank == 2: To = Vu.get_orthogonal() Top = To.get_dealiased() uv = Array(Top) uv.fill(0) if Top.is_padded: Vup = Vu.get_dealiased() Vvp = Vv.get_dealiased() if isinstance(u, Array): ua = u else: ua = Array(Vup) ua = Vup.backward(u, ua) if isinstance(v, Array): va = v else: va = Array(Vvp) va = Vvp.backward(v, va) if Vu.coors.is_cartesian: if Vu.tensor_rank == 1 and Vv.tensor_rank == 1: uv = np.sum(ua.v*va.v, axis=0, out=uv) elif Vu.tensor_rank == 2 and Vv.tensor_rank == 1: for i in range(D): ui = ua[i] for j in range(D): uv[i] += ui[j]*va[j] elif Vu.tensor_rank == 1 and Vv.tensor_rank == 2: for i in range(D): ui = ua[i] vi = va[i] for j in range(D): uv[j] += ui*vi[j] elif Vu.tensor_rank == 2 and Vv.tensor_rank == 2: for i in range(D): ui = ua[i] wi = uv[i] for j in range(D): vj = va[j] for k in range(D): wi[k] += ui[j]*vj[k] else: raise NotImplementedError else: gij = Vu.coors.get_metric_tensor(config['basisvectors']) mesh = Vup.local_mesh(True) def measure(g): sym0 = g.free_symbols m = [] for sym in sym0: j = 'xyzrs'.index(str(sym)) m.append(mesh[j]) return sp.lambdify(tuple(sym0), g)(*m) if Vu.tensor_rank == 1 and Vv.tensor_rank == 1: if Vu.coors.is_orthogonal: for i in range(D): uv += ua[i]*va[i]*measure(gij[i, i]) else: for i in range(D): ui = ua[i] for j in range(D): g = gij[i, j] if not g == 0: uv += ui*va[j]*measure(g) elif Vu.tensor_rank == 2 and Vv.tensor_rank == 1: for i in range(D): ui = ua[i] for j in range(D): for k in range(D): g = gij[j, k] if not g == 0: uv[i] += ui[j]*va[k]*measure(g) elif Vu.tensor_rank == 1 and Vv.tensor_rank == 2: for i in range(D): ui = ua[i] for j in range(D): vj = va[j] g = gij[i, j] if not g == 0: ms = measure(g) for k in range(D): uv[k] += ui*vj[k]*ms elif Vu.tensor_rank == 2 and Vv.tensor_rank == 2: for i in range(D): ui = ua[i] wi = uv[i] for j in range(D): for k in range(D): g = gij[j, k] if not g == 0: vk = va[k] ms = measure(g) for l in range(D): wi[l] += ui[j]*vk[l]*ms else: raise NotImplementedError if forward_output is True: if isinstance(output_array, Function): output_array = uv.forward(output_array) return output_array uv_hat = Function(To) uv_hat = uv.forward(uv_hat) return uv_hat return uv
@runtimeoptimizer def apply_mask(u_hat, mask): if mask is not None: u_hat *= mask return u_hat
[docs] def integrate_sympy(f, d): """Exact definite integral using sympy Try to convert expression `f` to a polynomial before integrating. See sympy issue https://github.com/sympy/sympy/pull/18613 to why this is needed. Poly().integrate() is much faster than sympy.integrate() when applicable. Parameters ---------- f : sympy expression d : 3-tuple First item the symbol, next two the lower and upper integration limits """ try: p = sp.Poly(f, d[0]).integrate() return p(d[2]) - p(d[1]) except sp.PolynomialError: #return sp.Integral(f, d).evalf() return sp.integrate(f, d).evalf()
def rsplit(measures): d = [] def _split(ms): ms = sp.sympify(ms) if isinstance(ms, Number): d = {'coeff': ms} elif len(ms.free_symbols) == 0: d = {'coeff': ms.evalf()} else: d = sp.separatevars(ms.factor(), dict=True) if d is None: return d d = defaultdict(lambda: 1, {str(k): v for k, v in d.items()}) dc = d['coeff'] d['coeff'] = int(dc) if isinstance(dc, (sp.Integer, int)) else float(dc) return d ms = sp.sympify(measures) di = _split(ms) if di is not None: d += [di] else: ms = ms.expand() for arg in ms.args: d += split(arg) return d def split(measures, expand=False): def _split(ms): ms = sp.sympify(ms) if isinstance(ms, Number): d = {'coeff': ms} elif len(ms.free_symbols) == 0: d = {'coeff': ms.evalf()} else: d = sp.separatevars(ms.factor(), dict=True) if d is None: raise RuntimeError('Could not split', ms) d = defaultdict(lambda: 1, {str(k): v for k, v in d.items()}) dc = sp.sympify(d['coeff']) if dc.is_integer: d['coeff'] = int(dc) elif dc.is_real: d['coeff'] = float(dc) elif dc.is_complex: d['coeff'] = complex(dc) else: raise RuntimeError return d ms = sp.sympify(measures) if not expand: try: di = _split(ms) if di is not None: return [di] except RuntimeError: pass ms = ms.expand() result = [] if isinstance(ms, sp.Add) or (isinstance(ms, sp.Mul) and np.all([isinstance(s, sp.Add) for s in ms.args])): for arg in ms.args: result.append(_split(arg)) else: result.append(_split(ms)) return result def oldsplit(measures): def _split(mss, result): for ms in mss: ms = sp.sympify(ms) if isinstance(ms, sp.Mul): # Multiplication of two or more terms result = _split(ms.args, result) continue # Something else with only one symbol sym = ms.free_symbols assert len(sym) <= 1 if len(sym) == 1: sym = sym.pop() result[str(sym)] *= ms else: ms = int(ms) if isinstance(ms, sp.Integer) else float(ms) result['coeff'] *= ms return result ms = sp.sympify(measures).expand() result = [] if isinstance(ms, sp.Add): for arg in ms.args: result.append(_split([arg], defaultdict(lambda: 1))) else: result.append(_split([ms], defaultdict(lambda: 1))) return result
[docs] def mayavi_show(): """ Return show function that updates the mayavi figure in the background. """ from pyface.api import GUI from mayavi import mlab return mlab.show(GUI().stop_event_loop)
[docs] def wrap_periodic(xs, axes=()): """Return arrays wrapped around periodically Parameters ---------- xs : array or sequence of arrays axes : sequence of integers, optional Extend arrays in xs by one in direction given by wrap """ xs = xs if isinstance(xs, (list, tuple)) else [xs] ys = [] for x in xs: if 0 in axes: x = np.vstack([x, x[0]]) if 1 in axes: x = np.hstack([x, x[:, 0][:, None]]) if 2 in axes: x = np.concatenate([x, x[:, :, 0][:, :, None]], axis=2) ys.append(x) if len(ys) == 1: return ys[0] return ys
[docs] def surf3D(u, mesh=None, backend='plotly', wrapaxes=(), slices=None, fig=None, **kw): """Plot surface embedded in 3D Parameters ---------- u : Function or Array mesh : list, str or None, optional - list of Cartesian meshes [x, y, z] - 'quadrature' - use quadrature mesh - 'uniform' - use uniform mesh backend : str, optional plotly or mayavi wrapaxes : sequence of integers, optional For domains that wrap around, extend mesh/u by one in given direction slices : None or sequence of slices, optional If only part of u should be plotted fig : Figure instance, optional kw : Keyword arguments, optional Used by plotly Surface. Possibly colorscale. """ from shenfun.forms.arguments import Function, Array if mesh is None: mesh = 'quadrature' if isinstance(mesh, str): assert isinstance(u, (Function, Array)), "u must be Function/Array if mesh is not given" T = u.function_space() x, y, z = T.local_cartesian_mesh(kind=mesh) else: x, y, z = mesh if isinstance(u, Function): u = u.backward(mesh=mesh) x, y, z, u = wrap_periodic([x, y, z, u], wrapaxes) if slices: x = x[slices] y = y[slices] z = z[slices] u = u[slices] if u.dtype.char in 'FDG': u = abs(u) if backend == 'plotly': import plotly.graph_objects as go s = go.Surface(x=x, y=y, z=z, surfacecolor=u, **kw) fig = go.Figure(s) d = {'visible': False, 'showgrid': False, 'zeroline': False} fig.update_layout(scene={'xaxis': d, 'yaxis': d, 'zaxis': d}) elif backend == 'mayavi': from mayavi import mlab fig = mlab.figure(bgcolor=(1, 1, 1)) mlab.mesh(x, y, z, scalars=u, colormap='jet') return fig
[docs] def quiver3D(u, mesh=None, wrapaxes=(), slices=None, fig=None, kind='quadrature', **kw): """ Parameters ---------- u : Array mesh : list of Cartesian meshes [x, y, z], optional backend : str, optional plotly or mayavi wrapaxes : sequence of integers, optional For domains that wrap around, extend mesh/u by one in given direction fig : None, optional If None create a new figure slices : None or sequence of slices, optional If only part of u should be plotted kind : str, optional - 'quadrature' - Use quadrature mesh - 'uniform' - Use uniform mesh kw : Keyword arguments, optional Arguments to mlab.quiver3d, for example 'scale_factor', 'color' or ''mode """ from shenfun.forms.arguments import Function, Array from mayavi import mlab par = { 'scale_factor': 0.1, 'color': (0, 0, 0), 'mode': '2darrow', } par.update(kw) if mesh is None: assert isinstance(u, (Function, Array)), "u must be Function/Array if mesh is not given" T = u.function_space() x, y, z = T.local_cartesian_mesh(kind=kind) else: x, y, z = mesh if isinstance(u, Function): u = u.backward(mesh=kind) if u.dtype.char in 'FDG': u = abs(u) x, y, z = wrap_periodic([x, y, z], wrapaxes) u = wrap_periodic([u], np.array(wrapaxes)+1) if slices: x = x[slices] y = y[slices] z = z[slices] u = u[slices] if fig is None: mlab.figure(bgcolor=(1, 1, 1), size=(400, 400)) mlab.quiver3d(x, y, z, u[0], u[1], u[2], **par)
[docs] def scalar_product(v, f, output_array=None, assemble='exact'): r"""Return scalar product .. math:: (v, f)_w Parameters ---------- v : :class:`.TestFunction` f : Sympy function output_array : :class:`.Function` assemble : str, optional - 'exact' - 'adaptive' Note ---- The computed scalar product is not compensated for a non-standard domain size. >>> from shenfun import inner, FunctionSpace, TestFunction >>> import sympy as sp >>> C = FunctionSpace(4, 'C') >>> v = TestFunction(C) >>> x = sp.Symbol('x', real=True) >>> f = x**2 >>> inner(v, f, assemble='exact') Function([1.57079633, 0. , 0.78539816, 0. ]) """ assert assemble in ('exact', 'adaptive') T = v.function_space() if output_array is None: from shenfun import Function output_array = Function(T) if T.is_composite_space: for vi, xi in zip(v, output_array): xi = scalar_product(vi, f, xi) return output_array x = sp.Symbol('x', real=True) cheb = T.family() == 'chebyshev' if not isinstance(f, Number): s = f.free_symbols assert len(s) == 1 x = s.pop() f = T.map_expression_true_domain(f, x=x) if cheb: f = f.subs(x, sp.cos(x)) if cheb: S = T.stencil_matrix().diags('csr') for i in range(T.slice().start, T.slice().stop): M = S.getrow(i) integrand = sp.S(0) for ind, d in zip(M.indices, M.data): integrand += d*sp.cos(ind*x) integrand = f*integrand if assemble == 'exact': output_array[i] = sp.integrate(integrand, (x, (0, sp.pi))) elif assemble == 'adaptive': if isinstance(integrand, Number): output_array[i] = integrand*np.pi else: output_array[i] = quad(sp.lambdify(x, integrand), 0, np.pi)[0] else: domain = T.reference_domain() f *= T.weight() for i in range(T.slice().start, T.slice().stop): integrand = f*sp.conjugate(T.basis_function(i, x=x)) if assemble == 'exact': output_array[i] = sp.integrate(integrand, (x, (domain[0], domain[1]))) elif assemble == 'adaptive': if len(integrand.free_symbols) == 0: if cheb: output_array[i] = integrand*np.pi else: output_array[i] = integrand*float(domain[1]-domain[0]) else: output_array[i] = quad(sp.lambdify(x, integrand), float(domain[0]), float(domain[1]))[0] if T.domain_factor() != 1: output_array /= float(T.domain_factor()) return output_array