from abc import ABC, abstractmethod
from copy import deepcopy, copy
from typing import Tuple, Optional, Callable, Sequence
import numpy as np
from scipy.interpolate import interpn
from scipy.stats import multivariate_normal, beta
from pararealml.constrained_problem import ConstrainedProblem
from pararealml.constraint import apply_constraints_along_last_axis
from pararealml.mesh import to_cartesian_coordinates
VectorizedInitialConditionFunction = \
Callable[[Optional[np.ndarray]], np.ndarray]
[docs]class InitialCondition(ABC):
"""
A base class for initial conditions.
"""
[docs] @abstractmethod
def y_0(self, x: Optional[np.ndarray]) -> np.ndarray:
"""
Returns the initial value of y at the points in the spatial domain
defined by x.
:param x: a 2D array (n, x_dimension) of the spatial coordinates for
PDEs and None for ODEs
:return: a 2D array (n, y_dimension) of the initial value of y at the
coordinates for PDEs and a 1D array (y_dimension) for ODEs
"""
[docs] @abstractmethod
def discrete_y_0(
self,
vertex_oriented: Optional[bool] = None) -> np.ndarray:
"""
Returns the discretized initial values of y evaluated at the vertices
or cell centers of the spatial mesh.
:param vertex_oriented: whether the initial conditions are to be
evaluated at the vertices or cell centers of the spatial mesh
:return: the discretized initial values
"""
[docs]class DiscreteInitialCondition(InitialCondition):
"""
An initial condition defined by a fixed array of values.
"""
def __init__(
self,
cp: ConstrainedProblem,
y_0: np.ndarray,
vertex_oriented: Optional[bool] = None,
interpolation_method: str = 'linear'):
"""
:param cp: the constrained problem to turn into an initial value
problem by providing the initial conditions for it
:param y_0: the array containing the initial values of y over a spatial
mesh (which may be 0 dimensional in case of an ODE)
:param vertex_oriented: whether the initial conditions are evaluated at
the vertices or cell centers of the spatial mesh; it the
constrained problem is an ODE, it can be None
:param interpolation_method: the interpolation method to use to
calculate values that do not exactly fall on points of the y_0
grid; if the constrained problem is based on an ODE, it can be None
"""
if cp.differential_equation.x_dimension and vertex_oriented is None:
raise ValueError('vertex orientation must be defined for PDEs')
if y_0.shape != cp.y_shape(vertex_oriented):
raise ValueError(
f'discrete initial value shape {y_0.shape} must match '
'constrained problem solution shape '
f'{cp.y_shape(vertex_oriented)}')
self._cp = cp
self._y_0 = np.copy(y_0)
self._vertex_oriented = vertex_oriented
self._interpolation_method = interpolation_method
if vertex_oriented:
apply_constraints_along_last_axis(
cp.static_y_vertex_constraints, self._y_0)
[docs] def y_0(self, x: Optional[np.ndarray]) -> np.ndarray:
if not self._cp.differential_equation.x_dimension:
return np.copy(self._y_0)
return interpn(
self._cp.mesh.axis_coordinates(self._vertex_oriented),
self._y_0,
x,
method=self._interpolation_method,
bounds_error=False,
fill_value=None)
[docs] def discrete_y_0(
self,
vertex_oriented: Optional[bool] = None) -> np.ndarray:
if vertex_oriented is None:
vertex_oriented = self._vertex_oriented
if not self._cp.differential_equation.x_dimension \
or vertex_oriented == self._vertex_oriented:
return np.copy(self._y_0)
y_0 = self.y_0(self._cp.mesh.all_index_coordinates(vertex_oriented))
if vertex_oriented:
apply_constraints_along_last_axis(
self._cp.static_y_vertex_constraints, y_0)
return y_0
[docs]class ContinuousInitialCondition(InitialCondition):
"""
An initial condition defined by a function.
"""
def __init__(
self,
cp: ConstrainedProblem,
y_0_func: VectorizedInitialConditionFunction):
"""
:param cp: the constrained problem to turn into an initial value
problem by providing the initial conditions for it
:param y_0_func: the initial value function that returns an array
containing the values of y at the spatial coordinates defined by
its input
"""
self._cp = cp
self._y_0_func = y_0_func
self._discrete_y_0_vertices = self._create_discrete_y_0(True)
self._discrete_y_0_cells = self._create_discrete_y_0(False)
[docs] def y_0(self, x: Optional[np.ndarray]) -> np.ndarray:
return self._y_0_func(x)
[docs] def discrete_y_0(
self,
vertex_oriented: Optional[bool] = None) -> np.ndarray:
return np.copy(
self._discrete_y_0_vertices if vertex_oriented
else self._discrete_y_0_cells)
def _create_discrete_y_0(self, vertex_oriented: bool) -> np.ndarray:
"""
Creates the discretized initial values of y evaluated at the vertices
or cell centers of the spatial mesh.
:param vertex_oriented: whether the initial conditions are to be
evaluated at the vertices or cell centers of the spatial mesh
:return: the discretized initial values
"""
diff_eq = self._cp.differential_equation
if not diff_eq.x_dimension:
y_0 = np.array(self._y_0_func(None))
if y_0.shape != self._cp.y_shape():
raise ValueError(
'expected initial condition function output shape to be '
f'{self._cp.y_shape()} but got {y_0.shape}')
return y_0
x = self._cp.mesh.all_index_coordinates(vertex_oriented, flatten=True)
y_0 = self._y_0_func(x)
if y_0.shape != (len(x), diff_eq.y_dimension):
raise ValueError(
'expected initial condition function output shape to be '
f'{(len(x), diff_eq.y_dimension)} but got {y_0.shape}')
y_0 = y_0.reshape(self._cp.y_shape(vertex_oriented))
if vertex_oriented:
apply_constraints_along_last_axis(
self._cp.static_y_vertex_constraints, y_0)
return y_0
def _convert_coordinates_to_cartesian(self, x: np.ndarray) -> np.ndarray:
"""
Converts the provided coordinates to Cartesian coordinates.
:param x: the coordinates to convert
:return: the converted Cartesian coordinates
"""
cartesian_x = to_cartesian_coordinates(
[x[:, i] for i in range(x.shape[1])],
self._cp.mesh.coordinate_system_type)
return np.stack(cartesian_x, axis=-1)
[docs]class GaussianInitialCondition(ContinuousInitialCondition):
"""
An initial condition defined explicitly by Gaussian probability density
functions.
"""
def __init__(
self,
cp: ConstrainedProblem,
means_and_covs: Sequence[Tuple[np.ndarray, np.ndarray]],
multipliers: Optional[Sequence[float]] = None):
"""
:param cp: the constrained problem to turn into an initial value
problem by providing the initial conditions for it
:param means_and_covs: a sequence of tuples of mean vectors and
covariance matrices defining the multivariate Gaussian PDFs
corresponding to each element of y_0
:param multipliers: an array of multipliers for each element of the
initial y values
"""
diff_eq = cp.differential_equation
if not diff_eq.x_dimension:
raise ValueError('constrained problem must be a PDE')
if len(means_and_covs) != diff_eq.y_dimension:
raise ValueError(
f'number of means and covariances ({len(means_and_covs)}) '
f'must match number of y dimensions ({diff_eq.y_dimension})')
for mean, cov in means_and_covs:
if mean.shape != (diff_eq.x_dimension,):
raise ValueError(
f'expected mean shape to be {(diff_eq.x_dimension,)} but '
f'got {mean.shape}')
if cov.shape != (diff_eq.x_dimension, diff_eq.x_dimension):
raise ValueError(
'expected covariance shape to be '
f'{(diff_eq.x_dimension, diff_eq.x_dimension)} but got '
f'{cov.shape}')
self._means_and_covs = deepcopy(means_and_covs)
if multipliers is not None:
if len(multipliers) != diff_eq.y_dimension:
raise ValueError(
f'length of multipliers ({len(multipliers)}) must match '
f'number of y dimensions ({diff_eq.y_dimension})')
self._multipliers = copy(multipliers)
else:
self._multipliers = [1.] * diff_eq.y_dimension
super(GaussianInitialCondition, self).__init__(cp, self._y_0)
def _y_0(self, x: Optional[np.ndarray]) -> np.ndarray:
"""
Calculates and returns the values of the multivariate Gaussian PDFs
corresponding to each element of y_0 at x.
:param x: the spatial coordinates
:return: the initial value of y at the coordinates
"""
cartesian_x = self._convert_coordinates_to_cartesian(x)
y_0 = np.empty((len(x), self._cp.differential_equation.y_dimension))
for i in range(self._cp.differential_equation.y_dimension):
mean, cov = self._means_and_covs[i]
multiplier = self._multipliers[i]
y_0_i = multivariate_normal.pdf(cartesian_x, mean=mean, cov=cov)
y_0[:, i] = multiplier * y_0_i
return y_0
[docs]class BetaInitialCondition(ContinuousInitialCondition):
"""
An initial condition defined explicitly by Beta probability density
functions.
"""
def __init__(
self,
cp: ConstrainedProblem,
alpha_and_betas: Sequence[Tuple[float, float]]):
"""
:param cp: the constrained problem to turn into an initial value
problem by providing the initial conditions for it
:param alpha_and_betas: a sequence of tuples containing the two
parameters defining the beta distributions corresponding to each
element of y_0
"""
diff_eq = cp.differential_equation
if diff_eq.x_dimension != 1:
raise ValueError('constrained problem must be a 1D PDE')
if len(alpha_and_betas) != diff_eq.y_dimension:
raise ValueError(
f'number of alphas and betas ({len(alpha_and_betas)}) must '
f'match number of y dimensions ({diff_eq.y_dimension})')
self._alpha_and_betas = copy(alpha_and_betas)
super(BetaInitialCondition, self).__init__(cp, self._y_0)
def _y_0(self, x: Optional[np.ndarray]) -> np.ndarray:
"""
Calculates and returns the values of the beta PDFs corresponding to
each element of y_0 at x.
:param x: the spatial coordinates
:return: the initial value of y at the coordinates
"""
return np.concatenate([
beta.pdf(x, a, b) for a, b in self._alpha_and_betas
], axis=-1)
[docs]def vectorize_ic_function(
ic_function: Callable[[Optional[Sequence[float]]], Sequence[float]]
) -> VectorizedInitialConditionFunction:
"""
Vectorizes an initial condition function that operates on a single
coordinate sequence so that it can operate on an array of coordinate
sequences.
The implementation of the vectorized function is nothing more than a for
loop over the rows of coordinate sequences in the x argument.
:param ic_function: the non-vectorized initial condition function
:return: the vectorized initial condition function
"""
def vectorized_ic_function(x: Optional[np.ndarray]) -> np.ndarray:
if x is None:
return np.array(ic_function(None))
values = []
for i in range(len(x)):
values.append(ic_function(x[i]))
return np.array(values)
return vectorized_ic_function