r"""
Module for function spaces using Fourier exponentials.
A basis function :math:`\phi_k` is given as
.. math::
\phi_k(x) = \exp(ikx)
and an expansion is given as
.. math::
:label: u
u(x) = \sum_{k=-N/2}^{N/2-1} \hat{u}_k \exp(ikx)
However, since :math:`\exp(ikx) = \exp(i(k \pm N)x)` this expansion can
also be written as an interpolator
.. math::
:label: u2
u(x) = \sum_{k=-N/2}^{N/2} \frac{\hat{u}_k}{c_k} \exp(ikx)
where :math:`c_{N/2} = c_{-N/2} = 2`, whereas :math:`c_k = 1` for
:math:`k=-N/2+1, ..., N/2-1`. Furthermore,
:math:`\hat{u}_{N/2} = \hat{u}_{-N/2}`.
The interpolator form is used for computing odd derivatives. Otherwise,
it makes no difference and therefore :eq:`u` is used in transforms, since
this is the form expected by fftw.
The inner product is defined as
.. math::
(u, v) = \frac{1}{L} \int_{0}^{L} u \overline{v} dx
where :math:`\overline{v}` is the complex conjugate of :math:`v`, and
:math:`L` is the length of the (periodic) domain.
"""
import sympy as sp
import numpy as np
from mpi4py_fft import fftw
from shenfun.spectralbase import SpectralBase, Transform, islicedict, slicedict
from shenfun.optimization import cython
from shenfun.config import config
bases = ['R2C', 'C2C']
bcbases = []
testbases = []
__all__ = bases
#pylint: disable=method-hidden, no-member, line-too-long, arguments-differ
class FourierBase(SpectralBase):
r"""Abstract base class for Fourier exponentials
"""
def __init__(self, N, padding_factor=1, domain=(0, 2*sp.pi), dtype=float,
dealias_direct=False, coordinates=None):
self._k = None
self._planned_axes = None # Collapsing of axes means that this base can be used to plan transforms over several collapsed axes. Store the axes planned for here.
SpectralBase.__init__(self, N, dtype=dtype, padding_factor=padding_factor,
dealias_direct=dealias_direct, domain=domain,
coordinates=coordinates)
@staticmethod
def family():
return 'fourier'
@staticmethod
def boundary_condition():
return 'Periodic'
def points_and_weights(self, N=None, map_true_domain=False, weighted=True, **kw):
if N is None:
N = self.shape(False)
points = np.arange(N, dtype=float)*2*np.pi/N
if map_true_domain is True:
points = self.map_true_domain(points)
if weighted:
# weight is 1/self.domain_length() since this leads to the Kronecker delta function for mass matrix
return points, np.array([float(self.domain_factor())/N])
return points, np.array([2*np.pi/N])
def orthogonal_basis_function(self, i=0, x=sp.symbols('x', real=True)):
k = self.wavenumbers(bcast=False, scaled=False, eliminate_highest_freq=False)
return sp.exp(sp.I*k[i]*x)
def L2_norm_sq(self, i):
return 1
def l2_norm_sq(self, i=None):
if i is None:
return np.ones(self.N)
return 1
def weight(self, x=sp.symbols('x', real=True)):
return 1/self.domain_length()
def evaluate_basis(self, x=None, i=0, output_array=None):
if x is None:
x = self.mesh(False, False)
x = np.atleast_1d(x)
if output_array is None:
output_array = np.zeros(x.shape, dtype=complex)
if self._k is None:
self._k = self.wavenumbers(bcast=False)
k = self._k[i]
output_array[:] = np.exp(1j*x*k)
return output_array
def evaluate_basis_derivative(self, x=None, i=0, k=0, output_array=None):
output_array = self.evaluate_basis(x, i, output_array)
l = self._k[i]
output_array *= ((1j*l)**k)
return output_array
def vandermonde(self, x):
k = self.wavenumbers(bcast=False)
x = np.atleast_1d(x)
return np.exp(1j*x[:, np.newaxis]*k[np.newaxis, :])
def evaluate_basis_derivative_all(self, x=None, k=0):
V = self.evaluate_basis_all(x=x)
if k > 0:
l = self.wavenumbers(bcast=False, scaled=False, eliminate_highest_freq=False)
V = V*((1j*l)**k)[np.newaxis, :]
return V
# Reimplemented for efficiency (smaller array in *= when truncated)
def forward(self, input_array=None, output_array=None, kind='fast'):
if kind != 'fast':
return SpectralBase.forward(self, input_array, output_array, kind=kind)
if input_array is not None:
self.forward.input_array[...] = input_array
self.forward.xfftn()
self._truncation_forward(self.forward.tmp_array,
self.forward.output_array)
M = self.get_normalization()
self.forward._output_array *= M
self.apply_inverse_mass(self.forward.output_array)
if output_array is not None:
output_array[...] = self.forward.output_array
return output_array
return self.forward.output_array
def apply_inverse_mass(self, array):
coors = self.tensorproductspace.coors if self.tensorproductspace else self.coors
if not coors.is_cartesian: # mass matrix may not be diagonal, or there is scaling
return SpectralBase.apply_inverse_mass(self, array)
return array
def _evaluate_scalar_product(self, kind='fast'):
if kind != 'fast':
SpectralBase._evaluate_scalar_product(self, kind=kind)
return
output = self.scalar_product.xfftn()
output *= self.get_normalization()
def reference_domain(self):
return (0, 2*sp.pi)
@property
def is_orthogonal(self):
return True
def get_orthogonal(self, **kwargs):
return self.get_unplanned(**kwargs)
def get_homogeneous(self, **kwargs):
return self.get_unplanned(**kwargs)
def get_unplanned(self, **kwargs):
d = dict(domain=self.domain,
padding_factor=self.padding_factor,
dealias_direct=self.dealias_direct,
coordinates=self.coors.coordinates)
d.update(kwargs)
return self.__class__(self.N, **d)
def get_dealiased(self, padding_factor=1.5, dealias_direct=False, **kwargs):
d = dict(domain=self.domain,
padding_factor=padding_factor,
dealias_direct=dealias_direct,
coordinates=self.coors.coordinates)
d.update(kwargs)
return self.__class__(self.N, **d)
def get_refined(self, N, **kwargs):
d = dict(domain=self.domain,
padding_factor=self.padding_factor,
dealias_direct=self.dealias_direct,
coordinates=self.coors.coordinates)
d.update(kwargs)
return self.__class__(N, **d)
def mask_nyquist(self, u_hat, mask=None):
"""Return array `u_hat` with zero Nyquist coefficients
Parameters
----------
u_hat : array
Array to be masked
mask : array or None, optional
mask array, if not provided then get the mask by calling
:func:`get_mask_nyquist`
"""
if mask is None:
mask = self.get_mask_nyquist(bcast=False)
if mask is not None:
u_hat *= mask
return u_hat
def plan(self, shape, axis, dtype, options):
if shape in (0, (0,)):
return
if isinstance(axis, int):
axis = [axis]
s = list(np.take(shape, axis))
if isinstance(self.forward, Transform):
if self.forward.input_array.shape == shape and axis == self._planned_axes:
# Already planned
return
plan_fwd = self._xfftn_fwd
plan_bck = self._xfftn_bck
self.axis = axis[-1]
self._planned_axes = axis
#opts = dict(
# overwrite_input='FFTW_DESTROY_INPUT',
# planner_effort=self.opts['planner_effort'],
# threads=self.opts['threads'],
#)
opts = plan_fwd.opts
opts['overwrite_input'] = 'FFTW_DESTROY_INPUT'
opts.update(options)
threads = opts['threads']
flags = (fftw.flag_dict[opts['planner_effort']],
fftw.flag_dict[opts['overwrite_input']])
U = fftw.aligned(shape, dtype=dtype)
xfftn_fwd = plan_fwd(U, s=s, axes=axis, threads=threads, flags=flags)
V = xfftn_fwd.output_array
opts = plan_bck.opts
opts['overwrite_input'] = 'FFTW_DESTROY_INPUT'
opts.update(options)
threads = opts['threads']
flags = (fftw.flag_dict[opts['planner_effort']],
fftw.flag_dict[opts['overwrite_input']])
if np.issubdtype(dtype, np.floating):
flags = (fftw.flag_dict[opts['planner_effort']],)
xfftn_bck = plan_bck(V, s=s, axes=axis, threads=threads, flags=flags, output_array=U)
V.fill(0)
U.fill(0)
self._M = xfftn_fwd.get_normalization()
if self.padding_factor > 1.+1e-8:
trunc_array = self._get_truncarray(shape, V.dtype)
self.scalar_product = Transform(self.scalar_product, xfftn_fwd, U, V, trunc_array)
self.forward = Transform(self.forward, xfftn_fwd, U, V, trunc_array)
self.backward = Transform(self.backward, xfftn_bck, trunc_array, V, U)
else:
self.scalar_product = Transform(self.scalar_product, xfftn_fwd, U, V, V)
self.forward = Transform(self.forward, xfftn_fwd, U, V, V)
self.backward = Transform(self.backward, xfftn_bck, V, V, U)
self.si = islicedict(axis=self.axis, dimensions=self.dimensions)
self.sl = slicedict(axis=self.axis, dimensions=self.dimensions)
[docs]
class R2C(FourierBase):
r"""Fourier function space for real to complex transforms
A basis function :math:`\phi_k` is given as
.. math::
\phi_k(x) = \exp(ikx), \quad k =-N/2, -N/2+1, \ldots, N/2-1
An expansion is given as
.. math::
u(x) = \sum_{k=-N/2}^{N/2-1} \hat{u}_k \exp(ikx).
Howewer, due to Hermitian symmetry :math:`\hat{u}_k = \overline{\hat{u}_{-k}}`,
which is taken advantage of in this function space. Specifically, Fourier
transforms make use of real-to-complex and complex-to-real algorithms, see
`FFTW <https://www.fftw.org/fftw3_doc/One_002dDimensional-DFTs-of-Real-Data.html>`_
and `rfftn <https://mpi4py-fft.readthedocs.io/en/latest/mpi4py_fft.fftw.html#mpi4py_fft.fftw.xfftn.rfftn>`_
of `mpi4py-fft <https://github.com/mpi4py/mpi4py-fft>`_.
Parameters
----------
N : int
Number of quadrature points. Should be even for efficiency, but
this is not required.
padding_factor : float, optional
Factor for padding backward transforms. padding_factor=1.5
corresponds to a 3/2-rule for dealiasing.
domain : 2-tuple of numbers, optional
The computational domain.
dealias_direct : bool, optional
True for dealiasing using 2/3-rule. Must be used with
padding_factor = 1.
coordinates: 2- or 3-tuple (coordinate, position vector (, sympy assumptions)), optional
Map for curvilinear coordinatesystem, and parameters to :class:`~shenfun.coordinates.Coordinates`
"""
def __init__(self, N, padding_factor=1., domain=(0, 2*sp.pi),
dealias_direct=False, coordinates=None, **kw):
FourierBase.__init__(self, N, padding_factor=padding_factor, dtype=float,
domain=domain, dealias_direct=dealias_direct,
coordinates=coordinates)
self.N = N
self._xfftn_fwd = fftw.rfftn
self._xfftn_bck = fftw.irfftn
self._xfftn_fwd.opts = config['fftw']['rfft']
self._xfftn_bck.opts = config['fftw']['irfft']
self._sn = []
self._sm = []
self.plan((int(padding_factor*N),), (0,), float, {})
[docs]
def wavenumbers(self, bcast=True, scaled=False, eliminate_highest_freq=False):
k = np.fft.rfftfreq(self.N, 1./self.N).astype(int)
if self.N % 2 == 0 and eliminate_highest_freq:
k[-1] = 0
if scaled:
k = k*float(self.domain_factor())
if bcast is True:
k = self.broadcast_to_ndims(k)
return k
[docs]
def get_mask_nyquist(self, bcast=True):
"""Return None or an array with zeros for Nyquist coefficients and one otherwise
Parameters
----------
bcast : boolean, optional
If True then broadcast returned mask array to dimensions of the
:class:`TensorProductSpace` this base belongs to.
"""
if self.N % 2 == 0:
f = np.ones(self.N//2+1, dtype=int)
f[-1] = 0
else:
return None
if bcast is True:
f = self.broadcast_to_ndims(f)
return f
def _get_truncarray(self, shape, dtype):
shape = list(shape)
shape[self.axis] = int(shape[self.axis] / self.padding_factor)
shape[self.axis] = shape[self.axis]//2 + 1
return fftw.aligned(shape, dtype=dtype)
[docs]
@staticmethod
def short_name():
return 'R2C'
[docs]
def slice(self):
return slice(0, self.N//2+1)
[docs]
def shape(self, forward_output=True):
if forward_output:
return self.N//2+1
return int(np.floor(self.padding_factor*self.N))
def _evaluate_expansion_all(self, input_array, output_array, x=None, kind='fast'):
if kind == 'vandermonde':
assert abs(self.padding_factor-1) < 1e-8
P = self.evaluate_basis_all(x=x)
if output_array.ndim == 1:
output_array[:] = np.dot(P, input_array).real
if self.N % 2 == 0:
output_array += np.conj(np.dot(P[:, 1:-1], input_array[1:-1])).real
else:
output_array += np.conj(np.dot(P[:, 1:], input_array[1:])).real
else:
fc = np.moveaxis(input_array, self.axis, -2)
array = np.dot(P, fc).real
s = [slice(None)]*fc.ndim
if self.N % 2 == 0:
s[-2] = slice(1, -1)
array += np.conj(np.dot(P[:, 1:-1], fc[tuple(s)])).real
else:
s[-2] = slice(1, None)
array += np.conj(np.dot(P[:, 1:], fc[tuple(s)])).real
output_array[:] = np.moveaxis(array, 0, self.axis)
return
assert kind == 'fast'
assert input_array is self.backward.tmp_array
assert output_array is self.backward.output_array
self.backward.xfftn(normalise_idft=False)
def _truncation_forward(self, padded_array, trunc_array):
if not id(trunc_array) == id(padded_array):
trunc_array.fill(0)
N = trunc_array.shape[self.axis]
s = self.sl[slice(0, N)]
trunc_array[:] = padded_array[s]
if self.N % 2 == 0:
s1 = self.si[N-1]
trunc_array[s1] = trunc_array[s1].real
trunc_array[s1] *= 2
def _padding_backward(self, trunc_array, padded_array):
if not id(trunc_array) == id(padded_array):
padded_array.fill(0)
N = trunc_array.shape[self.axis]
if len(self._sn) != self.dimensions:
self._sn = self.sl[slice(0, N)]
self._sm = self.si[N-1]
padded_array[self._sn] = trunc_array[self._sn]
if self.N % 2 == 0: # Symmetric Fourier interpolator
padded_array[self._sm] = padded_array[self._sm].real
padded_array[self._sm] *= 0.5
elif self.dealias_direct:
N = self.N
su = self.sl[slice(N//3, None)]
padded_array[su] = 0
[docs]
def convolve(self, u, v, uv=None, fast=True):
"""Convolution of u and v.
Parameters
----------
u : array
v : array
uv : array, optional
fast : bool, optional
Whether to use fast transforms in computing convolution
Note
----
Note that this method is only valid for 1D data, and that
for multidimensional arrays one should use corresponding method
in the :class:`.TensorProductSpace` class.
"""
N = self.N
if fast is True:
if uv is None:
uv = self.forward.output_array.copy()
assert self.padding_factor > 1.0, "padding factor must be > 3/2+1/N to perform convolution without aliasing"
u2 = self.backward.output_array.copy()
u3 = self.backward.output_array.copy()
u2 = self.backward(u, u2)
u3 = self.backward(v, u3)
uv = self.forward(u2*u3, uv)
else:
if uv is None:
uv = np.zeros(N+1, dtype=u.dtype)
Np = N if not N % 2 == 0 else N+1
k1 = np.fft.fftfreq(Np, 1./Np).astype(int)
cython.convolve.convolve_real_1D(u, v, uv, k1)
return uv
[docs]
class C2C(FourierBase):
r"""Fourier function space for complex to complex transforms
A basis function :math:`\phi_k` is given as
.. math::
\phi_k(x) = \exp(ikx), \quad k =-N/2, -N/2+1, \ldots, N/2-1
An expansion is given as
.. math::
u(x) = \sum_{k=-N/2}^{N/2-1} \hat{u}_k \exp(ikx).
Parameters
----------
N : int
Number of quadrature points. Should be even for efficiency, but
this is not required.
padding_factor : float, optional
Factor for padding backward transforms. padding_factor=1.5
corresponds to a 3/2-rule for dealiasing.
domain : 2-tuple of numbers, optional
The computational domain.
dealias_direct : bool, optional
True for dealiasing using 2/3-rule. Must be used with
padding_factor = 1.
coordinates: 2- or 3-tuple (coordinate, position vector (, sympy assumptions)), optional
Map for curvilinear coordinatesystem, and parameters to :class:`~shenfun.coordinates.Coordinates`
"""
def __init__(self, N, padding_factor=1, domain=(0, 2*sp.pi),
dealias_direct=False, coordinates=None, **kw):
FourierBase.__init__(self, N, padding_factor=padding_factor, dtype=complex,
domain=domain, dealias_direct=dealias_direct,
coordinates=coordinates)
self.N = N
self._xfftn_fwd = fftw.fftn
self._xfftn_bck = fftw.ifftn
self._xfftn_fwd.opts = config['fftw']['fft']
self._xfftn_bck.opts = config['fftw']['ifft']
self.plan((int(padding_factor*N),), (0,), complex, {})
self._slp = []
[docs]
@staticmethod
def short_name():
return 'C2C'
def _evaluate_expansion_all(self, input_array, output_array, x=None, kind='fast'):
if kind == 'vandermonde':
SpectralBase._evaluate_expansion_all(self, input_array, output_array, x, kind=kind)
return
assert kind == 'fast'
assert input_array is self.backward.tmp_array
assert output_array is self.backward.output_array
self.backward.xfftn(normalise_idft=False)
[docs]
def wavenumbers(self, bcast=True, scaled=False, eliminate_highest_freq=False):
k = np.fft.fftfreq(self.N, 1./self.N).astype(int)
if self.N % 2 == 0 and eliminate_highest_freq:
k[self.N//2] = 0
if scaled:
k = k*float(self.domain_factor())
if bcast is True:
k = self.broadcast_to_ndims(k)
return k
[docs]
def get_mask_nyquist(self, bcast=True):
if self.N % 2 == 0:
f = np.ones(self.N, dtype=int)
f[self.N//2] = 0
else:
return None
if bcast is True:
f = self.broadcast_to_ndims(f)
return f
[docs]
def slice(self):
return slice(0, self.N)
[docs]
def shape(self, forward_output=True):
if forward_output:
return self.N
return int(np.floor(self.padding_factor*self.N))
[docs]
def count_trailing_zeros(self, u, reltol=1e-12, abstol=1e-15):
assert u.function_space() == self
assert u.ndim == 1
a = abs(u[self.slice()])
ua = (a < reltol*a.max()) | (a < abstol)
return np.argmin(ua[self.N//2:]) + np.argmin(ua[:self.N//2][::-1])
def _truncation_forward(self, padded_array, trunc_array):
if not id(trunc_array) == id(padded_array):
trunc_array.fill(0)
N = trunc_array.shape[self.axis]
su = self.sl[slice(0, N//2+1)]
trunc_array[su] = padded_array[su]
su = self.sl[slice(-(N//2), None)]
trunc_array[su] += padded_array[su]
def _padding_backward(self, trunc_array, padded_array):
# pylint: disable=attribute-defined-outside-init
if not id(trunc_array) == id(padded_array):
padded_array.fill(0)
N = trunc_array.shape[self.axis]
if len(self._slp) != self.dimensions: # Store for microoptimization
self._slp = self.sl[slice(0, N//2+1)]
self._slm = self.sl[slice(-(N//2), None)]
self._slp0 = self.si[N//2]
self._slm0 = self.si[-(N//2)]
padded_array[self._slp] = trunc_array[self._slp]
padded_array[self._slm] = trunc_array[self._slm]
if self.N % 2 == 0: # Use symmetric Fourier interpolator
padded_array[self._slp0] *= 0.5
padded_array[self._slm0] *= 0.5
elif self.dealias_direct:
N = trunc_array.shape[self.axis]
su = self.sl[slice(N//3, -(N//3)+1)]
padded_array[su] = 0
[docs]
def convolve(self, u, v, uv=None, fast=True):
"""Convolution of u and v.
Parameters
----------
u : array
v : array
uv : array, optional
fast : bool, optional
Whether to use fast transforms in computing convolution
Note
----
Note that this method is only valid for 1D data, and that
for multidimensional arrays one should use corresponding method
in the TensorProductSpace class.
"""
assert len(u.shape) == 1
N = self.N
if fast:
if uv is None:
uv = self.forward.output_array.copy()
assert self.padding_factor > 1.0, "padding factor must be > 3/2+1/N to perform convolution without aliasing"
u2 = self.backward.output_array.copy()
u3 = self.backward.output_array.copy()
u2 = self.backward(u, u2)
u3 = self.backward(v, u3)
uv = self.forward(u2*u3, uv)
else:
if uv is None:
uv = np.zeros(2*N, dtype=u.dtype)
Np = N if not N % 2 == 0 else N+1
k = np.fft.fftfreq(Np, 1./Np).astype(int)
cython.convolve.convolve_1D(u, v, uv, k)
return uv