Source code for pararealml.differential_equation

from abc import ABC, abstractmethod
from copy import copy, deepcopy
from enum import Enum
from typing import Optional, Sequence, Dict, List, Union
from typing import Tuple

import numpy as np
from numpy.typing import NDArray
from sympy import symarray, Expr, Symbol


[docs]class Symbols: """ A class containing the symbols for expressing a coordinate system agnostic differential equation system with a specified number of unknown variables and spatial dimensions. """ def __init__(self, x_dimension: int, y_dimension: int): """ :param x_dimension: the number spatial dimensions :param y_dimension: the number of unknown variables """ self._t = Symbol('t') self._y = symarray('y', (y_dimension,)) self._x = None self._y_gradient = None self._y_hessian = None self._y_divergence = None self._y_curl = None self._y_laplacian = None self._y_vector_laplacian = None if x_dimension: self._x = symarray('x', (x_dimension,)) self._y_gradient = symarray( 'y-gradient', (y_dimension, x_dimension)) self._y_hessian = symarray( 'y-hessian', (y_dimension, x_dimension, x_dimension)) self._y_divergence = symarray( 'y-divergence', (y_dimension,) * x_dimension) if 2 <= x_dimension <= 3: self._y_curl = symarray( 'y-curl', ((y_dimension,) * x_dimension) + (x_dimension,) if x_dimension == 3 else ()) self._y_laplacian = symarray('y-laplacian', (y_dimension,)) self._y_vector_laplacian = symarray( 'y-vector-laplacian', ((y_dimension,) * x_dimension) + (x_dimension,)) @property def t(self) -> Symbol: """ A symbol denoting the temporal coordinate. """ return self._t @property def y(self) -> np.ndarray: """ An array of symbols denoting the elements of the solution of the differential equation. """ return copy(self._y) @property def x(self) -> Optional[np.ndarray]: """ An array of symbols denoting the elements of the spatial coordinates. """ return copy(self._x) @property def y_gradient(self) -> Optional[np.ndarray]: """ A 2D array of symbols denoting the first spatial derivatives of the solution where the first rank is the element of the solution and the second rank is the spatial axis. """ return copy(self._y_gradient) @property def y_hessian(self) -> Optional[np.ndarray]: """ A 3D array of symbols denoting the second spatial derivatives of the solution where the first rank is the element of the solution, the second rank is the first spatial axis, and the third rank is the second spatial axis. """ return copy(self._y_hessian) @property def y_divergence(self) -> Optional[np.ndarray]: """ A multidimensional array of symbols denoting the spatial divergence of the corresponding elements of the differential equation's solution. """ return copy(self._y_divergence) @property def y_curl(self) -> Optional[np.ndarray]: """ A multidimensional array of symbols denoting the spatial curl of the corresponding elements of the differential equation's solution. For two spatial dimensions, this corresponds to a scalar field. However, for three spatial dimensions, it corresponds to a vector field, therefore an additional axis is appended to this multidimensional array to allow for indexing the components of this vector field. For differential equations with less than two or more than three spatial dimensions, the curl is not defined. """ return copy(self._y_curl) @property def y_laplacian(self) -> Optional[np.ndarray]: """ An array of symbols denoting the spatial scalar Laplacian of the elements of the differential equation's solution. """ return copy(self._y_laplacian) @property def y_vector_laplacian(self) -> Optional[np.ndarray]: """ A multidimensional array of symbols denoting the spatial vector Laplacian of the corresponding elements of the differential equation's solution. An additional axis is appended to this multidimensional array to allow for indexing the components of the vector Laplacian. """ return copy(self._y_vector_laplacian)
[docs]class Lhs(Enum): """ An enumeration defining the types of the left-hand sides of symbolic equations making up systems of differential equations. """ D_Y_OVER_D_T = 0, Y = 1, Y_LAPLACIAN = 2
[docs]class SymbolicEquationSystem: """ A system of symbolic equations for defining differential equations. """ def __init__( self, rhs: Union[Sequence[Expr], NDArray[Expr]], lhs_types: Optional[Sequence[Lhs]] = None): """ :param rhs: the right-hand side of the symbolic equation system :param lhs_types: the types of the left-hand side of the symbolic equation system """ if len(rhs) < 1: raise ValueError('number of equations must be greater than 0') if lhs_types is None: lhs_types = [Lhs.D_Y_OVER_D_T] * len(rhs) if len(rhs) != len(lhs_types): raise ValueError( f'length of right-hand side ({len(rhs)}) must match length of ' f'left-hand side ({len(lhs_types)})') self._rhs = copy(rhs) self._lhs_types = copy(lhs_types) self._equation_indices_by_type: Dict[Lhs, List[int]] = \ {lhs_type: [] for lhs_type in Lhs} for i, (lhs_type, rhs_element) in enumerate(zip(lhs_types, rhs)): self._equation_indices_by_type[lhs_type].append(i) @property def rhs(self) -> Union[Sequence[Expr], NDArray[Expr]]: """ The right-hand side of the symbolic equation system. """ return copy(self._rhs) @property def lhs_types(self) -> Sequence[Lhs]: """ The types of the left-hand side of the symbolic equation system. """ return copy(self._lhs_types)
[docs] def equation_indices_by_type(self, lhs_type: Lhs) -> Sequence[int]: """ Returns a sequence of integers denoting the indices of all equations of the equation system with the specified type of left-hand side. :param lhs_type: the type of left-hand side :return: the sequence of indices """ return copy(self._equation_indices_by_type[lhs_type])
[docs]class DifferentialEquation(ABC): """ A representation of a time-dependent differential equation. """ def __init__( self, x_dimension: int, y_dimension: int, all_vector_field_indices: Optional[Sequence[Sequence[int]]] = None ): """ :param x_dimension: the number spatial dimensions :param y_dimension: the number of unknown variables :param all_vector_field_indices: an optional sequence of index sequences denoting the components of vector fields the solution contains """ if x_dimension < 0: raise ValueError( f'number of x dimensions ({x_dimension}) must be non-negative') if y_dimension < 1: raise ValueError( f'number of y dimensions ({y_dimension}) must be at least 1') if all_vector_field_indices: for indices in all_vector_field_indices: if len(indices) != x_dimension: raise ValueError( f'length of vector field indices {indices} must match ' f'x dimensions ({x_dimension})') for index in indices: if not (0 <= index < y_dimension): raise ValueError( 'all indices must be non-negative and less than ' f'the number of y dimensions ({y_dimension})') self._x_dimension = x_dimension self._y_dimension = y_dimension self._all_vector_field_indices = deepcopy(all_vector_field_indices) self._symbols = Symbols(x_dimension, y_dimension) self._validate_equations() @property def x_dimension(self) -> int: """ The dimension of the non-temporal domain of the differential equation's solution. If the differential equation is an ODE, it is 0. """ return self._x_dimension @property def y_dimension(self) -> int: """ The dimension of the image of the differential equation's solution. If the solution is not vector-valued, its dimension is 1. """ return self._y_dimension @property def symbols(self) -> Symbols: """ All valid symbols that can be used to define a differential equation of this many spatial dimensions and unknown variables. """ return self._symbols @property def all_vector_field_indices(self) -> Optional[Sequence[Sequence[int]]]: """ An optional sequence of index sequences denoting the components of vector fields the solution contains. """ return deepcopy(self._all_vector_field_indices) @property @abstractmethod def symbolic_equation_system(self) -> SymbolicEquationSystem: """ A system of symbolic equations defining the differential equation system. Every element of the right-hand side of the returned system defines the first time derivative, the direct value, or the spatial Laplacian of the respective element of the vector-valued solution of the differential equation system depending on the type of the left-hand side of the equation. """ def _validate_equations(self): """ Validates the symbolic equations defining the differential equation. """ equation_system = self.symbolic_equation_system if len(equation_system.rhs) != self._y_dimension: raise ValueError( f'number of equations ({len(equation_system.rhs)}) must match ' f'number of y dimensions ({self._y_dimension})') all_symbols = set() all_symbols.add(self._symbols.t) all_symbols.update(self._symbols.y) if self._x_dimension: all_symbols.update(self._symbols.x) all_symbols.update(self._symbols.y_gradient.flatten()) all_symbols.update(self._symbols.y_hessian.flatten()) all_symbols.update(self._symbols.y_divergence.flatten()) if 2 <= self._x_dimension <= 3: all_symbols.update(self._symbols.y_curl.flatten()) all_symbols.update(self._symbols.y_laplacian) all_symbols.update(self._symbols.y_vector_laplacian.flatten()) for i, rhs_element in enumerate(self.symbolic_equation_system.rhs): rhs_symbols = rhs_element.free_symbols if not rhs_symbols.issubset(all_symbols): raise ValueError( 'invalid symbol in right-hand side symbols ' f'({rhs_symbols}) of equation {i}') d_y_over_d_t_indices = \ equation_system.equation_indices_by_type(Lhs.D_Y_OVER_D_T) if self._x_dimension: if len(d_y_over_d_t_indices) == 0: raise ValueError( 'at least one equation\'s left-hand side must be of type ' 'D_Y_OVER_D_T') elif len(d_y_over_d_t_indices) != self._y_dimension: raise ValueError( 'ordinary differential equation systems can only contain ' 'equations with D_Y_OVER_D_T type left-hand sides')
[docs]class PopulationGrowthEquation(DifferentialEquation): """ A simple ordinary differential equation modelling the growth of a population over time. """ def __init__(self, r: float = .01): """ :param r: the population growth rate """ self._r = r super(PopulationGrowthEquation, self).__init__(0, 1) @property def symbolic_equation_system(self) -> SymbolicEquationSystem: return SymbolicEquationSystem([self._r * self._symbols.y[0]])
[docs]class LotkaVolterraEquation(DifferentialEquation): """ A system of two ordinary differential equations modelling the dynamics of populations of preys and predators. """ def __init__( self, alpha: float = 2., beta: float = .04, gamma: float = 1.06, delta: float = .02): """ :param alpha: the preys' birthrate :param beta: a coefficient of the decrease of the prey population :param gamma: the predators' mortality rate :param delta: a coefficient of the increase of the predator population """ if alpha < 0. or beta < 0. or gamma < 0. or delta < 0.: raise ValueError('all coefficients must be non-negative') self._alpha = alpha self._beta = beta self._gamma = gamma self._delta = delta super(LotkaVolterraEquation, self).__init__(0, 2) @property def symbolic_equation_system(self) -> SymbolicEquationSystem: r = self._symbols.y[0] p = self._symbols.y[1] return SymbolicEquationSystem([ self._alpha * r - self._beta * r * p, self._delta * r * p - self._gamma * p ])
[docs]class LorenzEquation(DifferentialEquation): """ A system of three ordinary differential equations modelling atmospheric convection. """ def __init__( self, sigma: float = 10., rho: float = 28., beta: float = 8. / 3.): """ :param sigma: the first system coefficient :param rho: the second system coefficient :param beta: the third system coefficient """ if sigma < .0 or rho < .0 or beta < .0: raise ValueError('all coefficients must be non-negative') self._sigma = sigma self._rho = rho self._beta = beta super(LorenzEquation, self).__init__(0, 3) @property def symbolic_equation_system(self) -> SymbolicEquationSystem: c = self._symbols.y[0] h = self._symbols.y[1] v = self._symbols.y[2] return SymbolicEquationSystem([ self._sigma * (h - c), c * (self._rho - v) - h, c * h - self._beta * v ])
[docs]class NBodyGravitationalEquation(DifferentialEquation): """ A system of ordinary differential equations modelling the motion of planetary objects. """ def __init__( self, n_dims: int, masses: Sequence[float], g: float = 6.6743e-11): """ :param n_dims: the spatial dimensionality the motion of the objects is to be considered in (must be either 2 or 3) :param masses: a list of the masses of the objects (kg) :param g: the gravitational constant (m^3 * kg^-1 * s^-2) """ if n_dims < 2 or n_dims > 3: raise ValueError( f'number of dimensions ({n_dims}) must be either 2 or 3') if len(masses) < 2: raise ValueError( f'number of masses ({len(masses)}) must be at least 2') if np.any(np.array(masses) <= 0.): raise ValueError(f'all masses ({masses}) must be greater than 0') self._dims = n_dims self._masses = tuple(masses) self._n_objects = len(masses) self._g = g super(NBodyGravitationalEquation, self).__init__( 0, 2 * len(masses) * n_dims) @property def spatial_dimension(self) -> int: """ Returns the number of spatial dimensions. """ return self._dims @property def masses(self) -> Tuple[float, ...]: """ Returns the masses of the planetary objects. """ return copy(self._masses) @property def n_objects(self) -> int: """ Returns the number of planetary objects. """ return self._n_objects @property def symbolic_equation_system(self) -> SymbolicEquationSystem: y = np.array(self._symbols.y, dtype=object) n_obj_by_dims = self._n_objects * self._dims d_y_over_d_t = np.empty(self._y_dimension, dtype=object) d_y_over_d_t[:n_obj_by_dims] = y[n_obj_by_dims:] forces_shape = (self._n_objects, self._n_objects, self._dims) forces = np.zeros(forces_shape, dtype=object) for i in range(self._n_objects): position_offset_i = i * self._dims position_i = y[position_offset_i:position_offset_i + self._dims] mass_i = self._masses[i] for j in range(i + 1, self._n_objects): position_offset_j = j * self._dims position_j = y[position_offset_j: position_offset_j + self._dims] mass_j = self._masses[j] displacement = position_j - position_i distance = np.power(np.power(displacement, 2).sum(axis=-1), .5) force = (self._g * mass_i * mass_j) * \ (displacement / np.power(distance, 3)) forces[i, j, :] = force forces[j, i, :] = -force acceleration = forces[i, :, :].sum(axis=0) / mass_i velocity_offset = n_obj_by_dims + position_offset_i d_y_over_d_t[velocity_offset:velocity_offset + self._dims] = \ acceleration return SymbolicEquationSystem(d_y_over_d_t)
[docs]class DiffusionEquation(DifferentialEquation): """ A partial differential equation modelling the diffusion of particles. """ def __init__(self, x_dimension: int, d: float = 1.): """ :param x_dimension: the dimensionality of the spatial domain of the differential equation's solution :param d: the diffusion coefficient """ if x_dimension <= 0: raise ValueError( f'number of x dimensions ({x_dimension}) must be at least 1') self._d = d super(DiffusionEquation, self).__init__(x_dimension, 1) @property def symbolic_equation_system(self) -> SymbolicEquationSystem: return SymbolicEquationSystem([self._d * self._symbols.y_laplacian[0]])
[docs]class ConvectionDiffusionEquation(DifferentialEquation): """ A partial differential equation modelling the convection and diffusion of particles. """ def __init__( self, x_dimension: int, velocity: Sequence[float], d: float = 1.): """ :param x_dimension: the dimensionality of the spatial domain of the differential equation's solution :param velocity: the convection velocity vector :param d: the diffusion coefficient """ if x_dimension <= 0: raise ValueError( f'number of x dimensions ({x_dimension}) must be at least 1') if len(velocity) != x_dimension: raise ValueError( f'length of the velocity vector ({len(velocity)}) must match ' f'number of x dimensions ({x_dimension})') self._velocity = copy(velocity) self._d = d super(ConvectionDiffusionEquation, self).__init__(x_dimension, 1) @property def symbolic_equation_system(self) -> SymbolicEquationSystem: return SymbolicEquationSystem([ self._d * self._symbols.y_laplacian[0] - np.dot(self._velocity, self._symbols.y_gradient[0, :]) ])
[docs]class WaveEquation(DifferentialEquation): """ A partial differential equation modelling the propagation of waves. """ def __init__(self, x_dimension: int, c: float = 1.): """ :param x_dimension: the dimensionality of the spatial domain of the differential equation's solution :param c: the propagation speed coefficient """ if x_dimension <= 0: raise ValueError( f'number of x dimensions ({x_dimension}) must be at least 1') self._c = c super(WaveEquation, self).__init__(x_dimension, 2) @property def symbolic_equation_system(self) -> SymbolicEquationSystem: return SymbolicEquationSystem([ self._symbols.y[1], (self._c ** 2) * self._symbols.y_laplacian[0] ])
[docs]class CahnHilliardEquation(DifferentialEquation): """ A partial differential equation modelling phase separation. """ def __init__(self, x_dimension: int, d: float = .1, gamma: float = .01): """ :param x_dimension: the dimensionality of the spatial domain of the differential equation's solution :param d: the potential diffusion coefficient :param gamma: the concentration diffusion coefficient """ if x_dimension <= 0: raise ValueError( f'number of x dimensions ({x_dimension}) must be at least 1') self._d = d self._gamma = gamma super(CahnHilliardEquation, self).__init__(x_dimension, 2) @property def symbolic_equation_system(self) -> SymbolicEquationSystem: sym = self._symbols return SymbolicEquationSystem( [ self._d * sym.y_laplacian[1], sym.y[0] ** 3 - sym.y[0] - self._gamma * sym.y_laplacian[0], ], [ Lhs.D_Y_OVER_D_T, Lhs.Y ] )
[docs]class BurgerEquation(DifferentialEquation): """ A system of partial differential equations providing a simplified model of fluid flow. """ def __init__(self, x_dimension: int, re: float = 4000.): """ :param x_dimension: the dimensionality of the spatial domain of the differential equation's solution :param re: the Reynolds number """ if x_dimension <= 0: raise ValueError( f'number of x dimensions ({x_dimension}) must be at least 1') self._re = re super(BurgerEquation, self).__init__( x_dimension, x_dimension, [tuple(range(x_dimension))]) @property def symbolic_equation_system(self) -> SymbolicEquationSystem: return SymbolicEquationSystem([ (1. / self._re) * self._symbols.y_laplacian[i] - np.dot(self._symbols.y, self._symbols.y_gradient[i, :]) for i in range(self._x_dimension) ])
[docs]class ShallowWaterEquation(DifferentialEquation): """ A system of partial differential equations providing a non-conservative model of fluid flow below a pressure surface. """ def __init__( self, h: float, b: float = .01, v: float = .1, f: float = 0., g: float = 9.80665): """ :param h: the mean height of the pressure surface :param b: the viscous drag coefficient :param v: the kinematic viscosity coefficient :param f: the Coriolis coefficient :param g: the gravitational acceleration coefficient """ self._h = h self._b = b self._v = v self._f = f self._g = g super(ShallowWaterEquation, self).__init__(2, 3, [(1, 2)]) @property def symbolic_equation_system(self) -> SymbolicEquationSystem: return SymbolicEquationSystem([ -self._h * self._symbols.y_divergence[1, 2] - self._symbols.y[0] * self._symbols.y_gradient[1, 0] - self._symbols.y[1] * self._symbols.y_gradient[0, 0] - self._symbols.y[0] * self._symbols.y_gradient[2, 1] - self._symbols.y[2] * self._symbols.y_gradient[0, 1], self._v * self._symbols.y_laplacian[1] - self._symbols.y[1] * self._symbols.y_gradient[1, 0] - self._symbols.y[2] * self._symbols.y_gradient[1, 1] - self._g * self._symbols.y_gradient[0, 0] - self._b * self._symbols.y[1] + self._f * self._symbols.y[2], self._v * self._symbols.y_laplacian[2] - self._symbols.y[1] * self._symbols.y_gradient[2, 0] - self._symbols.y[2] * self._symbols.y_gradient[2, 1] - self._g * self._symbols.y_gradient[0, 1] - self._b * self._symbols.y[2] - self._f * self._symbols.y[1] ])