Source code for pararealml.operators.ml.pidon.data_set

from __future__ import annotations

from collections.abc import Iterator
from typing import Optional, NamedTuple, Iterable

import numpy as np
import tensorflow as tf

from pararealml.constrained_problem import ConstrainedProblem
from pararealml.initial_condition import \
    VectorizedInitialConditionFunction
from pararealml.initial_value_problem import TemporalDomainInterval
from pararealml.operators.ml.pidon.collocation_point_sampler import \
    CollocationPointSampler


[docs]class DataSet: """ A generator and container of all the data necessary to train a physics-informed DeepONet with variable initial conditions. """ def __init__( self, cp: ConstrainedProblem, t_interval: TemporalDomainInterval, y_0_functions: Iterable[VectorizedInitialConditionFunction], point_sampler: CollocationPointSampler, n_domain_points: int, n_boundary_points: int = 0, vertex_oriented: bool = False): """ :param cp: the constrained problem to generate the spatial data about :param t_interval: the bounds of the temporal domain to generate data from :param y_0_functions: the initial condition functions :param point_sampler: a sampler instance for sampling the collocation points :param n_domain_points: the number of domain points to sample :param n_boundary_points: the number of boundary points to sample; if the constrained problem is an ODE, it should be 0 :param vertex_oriented: whether the initial condition collocation points should be the vertices or the cell centers of the mesh """ x_dimension = cp.differential_equation.x_dimension if n_domain_points <= 0: raise ValueError( f'number of domain points ({n_domain_points}) must be greater ' f'than 0') if n_boundary_points < 0: raise ValueError( f'number of boundary points ({n_boundary_points}) must be ' f'non-negative') if not x_dimension and n_boundary_points: raise ValueError('number of boundary points must be 0 for ODEs') self._cp = cp self._t_interval = t_interval self._y_0_functions = y_0_functions self._point_sampler = point_sampler self._n_domain_points = n_domain_points self._n_boundary_points = n_boundary_points self._vertex_oriented = vertex_oriented self._initial_value_data = self._create_initial_value_data() self._domain_collocation_data = self._create_domain_collocation_data() self._initial_collocation_data = \ self._create_initial_collocation_data() self._boundary_collocation_data = \ self._create_boundary_collocation_data() @property def constrained_problem(self) -> ConstrainedProblem: """ The constrained problem the data set is built around. """ return self._cp @property def initial_value_data(self) -> np.ndarray: """ The initial value data where each row is a different initial condition function and each column represents a component of the initial condition function evaluated over a point of the constrained problem's mesh. """ return self._initial_value_data @property def domain_collocation_data(self) -> np.ndarray: """ The domain collocation data points where the first column is t and all other columns are x. """ return self._domain_collocation_data @property def initial_collocation_data(self) -> np.ndarray: """ The initial collocation data points where the first column is t and all other columns are x. """ return self._initial_collocation_data @property def boundary_collocation_data(self) -> Optional[np.ndarray]: """ The boundary collocation data points where the columns represent t, x, y, the derivative of y with respect to the unit normal vector of the boundary, and finally the axis denoting the direction of the normal vector. """ return self._boundary_collocation_data
[docs] def get_iterator( self, n_batches: int, n_ic_repeats: int = 1, shuffle: bool = True) -> DataSetIterator: """ Returns an iterator over the data set to enable iterating over the Cartesian product of the initial value data and the collocation data batch by batch. :param n_batches: the number of batches to map the underlying data to :param n_ic_repeats: the number of times to repeat the initial collocation data samples in an epoch :param shuffle: whether to shuffle the data behind the iterator :return: the iterator over the data set """ return DataSetIterator(self, n_batches, n_ic_repeats, shuffle)
def _create_initial_value_data(self) -> np.ndarray: """ Creates the initial value data by evaluating the initial condition functions (over the centers of the mesh's cells in case the constrained problem is a PDE). """ if self._cp.differential_equation.x_dimension: x = self._cp.mesh.all_index_coordinates( self._vertex_oriented, flatten=True) initial_value_data = np.vstack([ y_0_func(x).flatten() for y_0_func in self._y_0_functions ]) else: initial_value_data = np.array([ y_0_func(None) for y_0_func in self._y_0_functions ]) initial_value_data.setflags(write=False) return initial_value_data def _create_domain_collocation_data(self) -> np.ndarray: """ Creates the domain collocation data by sampling collocation points from the space-time domain of the constrained problem combined with the time interval. """ if self._cp.differential_equation.x_dimension: domain_points = self._point_sampler.sample_domain_points( self._n_domain_points, self._t_interval, self._cp.mesh.x_intervals) domain_collocation_data = \ np.concatenate((domain_points.t, domain_points.x), axis=1) else: domain_points = self._point_sampler.sample_domain_points( self._n_domain_points, self._t_interval, None) domain_collocation_data = domain_points.t domain_collocation_data.setflags(write=False) return domain_collocation_data def _create_initial_collocation_data(self) -> np.ndarray: """ Creates the initial collocation data by combining the coordinates of the cell centers of the constrained problem's mesh (if the constrained problem is a PDE) with an array of zeros representing the time points. """ if self._cp.differential_equation.x_dimension: x = self._cp.mesh.all_index_coordinates( self._vertex_oriented, flatten=True) t = np.zeros((len(x), 1)) initial_collocation_data = np.hstack((t, x)) else: initial_collocation_data = np.zeros((1, 1)) initial_collocation_data.setflags(write=False) return initial_collocation_data def _create_boundary_collocation_data(self) -> Optional[np.ndarray]: """ Creates the boundary collocation data by sampling collocation points from the spatial boundaries of the space-time domain of the constrained problem combined with the time interval; if the constrained problem is a PDE, it also evaluates the boundary conditions (both Dirichlet and Neumann) and includes the constraints in the data set. """ diff_eq = self._cp.differential_equation x_dimension = diff_eq.x_dimension y_dimension = diff_eq.y_dimension if not x_dimension: return None all_boundary_points = self._point_sampler.sample_boundary_points( self._n_boundary_points, self._t_interval, self._cp.mesh.x_intervals) t = [] x = [] y = [] d_y_over_d_n = [] axes = [] for axis, (bc_pair, boundary_points_pair) in enumerate( zip(self._cp.boundary_conditions, all_boundary_points)): for bc, boundary_points in zip(bc_pair, boundary_points_pair): if boundary_points is None: continue for i in range(boundary_points.t.shape[0]): t_i = boundary_points.t[i] x_i = boundary_points.x[i:i + 1] y_i = bc.y_condition(x_i, t_i)[0] \ if bc.has_y_condition else [None] * y_dimension d_y_over_d_n_i = bc.d_y_condition(x_i, t_i)[0] \ if bc.has_d_y_condition else [None] * y_dimension t.append(t_i) x.append(x_i[0]) axes.append([axis]) y.append(y_i) d_y_over_d_n.append(d_y_over_d_n_i) boundary_collocation_data = np.concatenate([ np.array(t), np.array(x), np.array(y), np.array(d_y_over_d_n), np.array(axes) ], axis=1) boundary_collocation_data.setflags(write=False) return boundary_collocation_data
[docs]class DataSetIterator(Iterator): """ An iterator over a data set that computes the Cartesian products of the initial value data with the domain, initial, and boundary collocation data. """ def __init__( self, data_set: DataSet, n_batches: int, n_ic_repeats: int = 1, shuffle: bool = True): """ :param data_set: the data set to iterate over :param n_batches: the number of batches per epoch :param n_ic_repeats: the number of times to repeat the initial collocation data samples in an epoch :param shuffle: whether to shuffle the Cartesian product of the initial condition data and collocation data. """ self._data_set = data_set self._n_batches = n_batches self._n_ic_repeats = n_ic_repeats self._shuffle = shuffle self._iv_data_size = data_set.initial_value_data.shape[0] self._domain_data_size = data_set.domain_collocation_data.shape[0] self._initial_data_size = data_set.initial_collocation_data.shape[0] self._boundary_data_size = \ 0 if data_set.boundary_collocation_data is None \ else data_set.boundary_collocation_data.shape[0] self._total_domain_data_size = \ self._iv_data_size * self._domain_data_size self._total_initial_data_size = \ n_ic_repeats * self._iv_data_size * self._initial_data_size self._total_boundary_data_size = \ self._iv_data_size * self._boundary_data_size if self._total_domain_data_size % n_batches != 0 \ or self._total_initial_data_size % n_batches != 0 \ or self._total_boundary_data_size % n_batches != 0: raise ValueError( 'number of batches must be a common divisor of total domain ' f'data size ({self._total_domain_data_size}), total initial ' f'data size ({self._total_initial_data_size}), and total ' f'boundary data size ({self._total_boundary_data_size})') self._domain_batch_size = self._total_domain_data_size // n_batches self._initial_batch_size = self._total_initial_data_size // n_batches self._boundary_batch_size = self._total_boundary_data_size // n_batches self._domain_indices = self._create_cartesian_product_indices( self._iv_data_size, self._domain_data_size) self._initial_indices = np.tile( self._create_cartesian_product_indices( self._iv_data_size, self._initial_data_size), (n_ic_repeats, 1)) self._boundary_indices = self._create_cartesian_product_indices( self._iv_data_size, self._boundary_data_size) \ if self._boundary_data_size else None self._batch_counter = 0 def __iter__(self) -> DataSetIterator: self.reset() return self def __next__(self) -> DataBatch: if self._batch_counter >= self._n_batches: raise StopIteration batch = DataBatch( self._get_domain_batch(self._domain_batch_size), self._get_initial_batch(self._initial_batch_size), self._get_boundary_batch(self._boundary_batch_size)) self._batch_counter += 1 return batch @property def domain_batch_size(self) -> int: """ The domain data batch size used by the iterator. """ return self._domain_batch_size @property def initial_batch_size(self) -> int: """ The initial data batch size used by the iterator. """ return self._initial_batch_size @property def boundary_batch_size(self) -> int: """ The boundary data batch size used by the iterator. """ return self._boundary_batch_size
[docs] def get_full_batch(self) -> DataBatch: """ Returns the full Cartesian product of all the initial value data and all the domain, initial, and boundary collocation data contained within the underlying data set. """ self.reset() return DataBatch( self._get_domain_batch(self._total_domain_data_size), self._get_initial_batch( self._total_initial_data_size // self._n_ic_repeats), self._get_boundary_batch(self._total_boundary_data_size))
[docs] def reset(self): """ Resets the iterator so that the data set can be iterated over again. """ self._batch_counter = 0 if self._shuffle: np.random.shuffle(self._domain_indices) np.random.shuffle(self._initial_indices) if self._boundary_data_size: np.random.shuffle(self._boundary_indices)
def _get_domain_batch(self, domain_batch_size: int) -> DomainDataBatch: """ Returns a domain data batch of the specified size. :param domain_batch_size: the domain data batch size :return: the domain data batch """ start_ind = self._batch_counter * self._domain_batch_size domain_indices = \ self._domain_indices[start_ind:start_ind + domain_batch_size, :] domain_iv_data_indices = domain_indices[:, 0] domain_collocation_data_indices = domain_indices[:, 1] domain_iv_data = \ self._data_set.initial_value_data[domain_iv_data_indices] domain_collocation_data = self._data_set.domain_collocation_data[ domain_collocation_data_indices ] diff_eq = self._data_set.constrained_problem.differential_equation return DomainDataBatch( tf.convert_to_tensor(domain_iv_data, tf.float32), tf.convert_to_tensor(domain_collocation_data[:, :1], tf.float32), tf.convert_to_tensor(domain_collocation_data[:, 1:], tf.float32) if diff_eq.x_dimension else None) def _get_initial_batch( self, initial_batch_size: int) -> InitialDataBatch: """ Returns an initial condition data batch of the specified size. :param initial_batch_size: the initial condition data batch size :return: the initial condition data batch """ start_ind = self._batch_counter * self._initial_batch_size initial_indices = \ self._initial_indices[start_ind:start_ind + initial_batch_size, :] initial_iv_data_indices = initial_indices[:, 0] initial_collocation_data_indices = initial_indices[:, 1] initial_iv_data = \ self._data_set.initial_value_data[initial_iv_data_indices] initial_collocation_data = self._data_set.initial_collocation_data[ initial_collocation_data_indices ] initial_u_tensor = tf.convert_to_tensor(initial_iv_data, tf.float32) initial_t_tensor = tf.convert_to_tensor( initial_collocation_data[:, :1], tf.float32) diff_eq = self._data_set.constrained_problem.differential_equation if diff_eq.x_dimension: initial_x_tensor = tf.convert_to_tensor( initial_collocation_data[:, 1:], tf.float32) initial_y_tensor = tf.convert_to_tensor( initial_iv_data.reshape( (initial_batch_size, -1, diff_eq.y_dimension) )[ np.arange(initial_batch_size), initial_collocation_data_indices, : ], tf.float32) else: initial_x_tensor = None initial_y_tensor = initial_u_tensor return InitialDataBatch( initial_u_tensor, initial_t_tensor, initial_x_tensor, initial_y_tensor) def _get_boundary_batch( self, boundary_batch_size: int) -> Optional[BoundaryDataBatch]: """ Returns a boundary data batch of the specified size. :param boundary_batch_size: the boundary data batch size :return: the boundary data batch """ if boundary_batch_size == 0: return None start_ind = self._batch_counter * self._boundary_batch_size boundary_indices = self._boundary_indices[ start_ind:start_ind + boundary_batch_size, : ] boundary_iv_data_indices = boundary_indices[:, 0] boundary_collocation_data_indices = boundary_indices[:, 1] boundary_iv_data = \ self._data_set.initial_value_data[boundary_iv_data_indices] boundary_collocation_data = self._data_set.boundary_collocation_data[ boundary_collocation_data_indices ] diff_eq = self._data_set.constrained_problem.differential_equation x_offset = 1 y_offset = x_offset + diff_eq.x_dimension d_y_over_d_n_offset = y_offset + diff_eq.y_dimension axes_offset = d_y_over_d_n_offset + diff_eq.y_dimension return BoundaryDataBatch( tf.convert_to_tensor(boundary_iv_data, tf.float32), tf.convert_to_tensor( boundary_collocation_data[:, :x_offset], tf.float32), tf.convert_to_tensor( boundary_collocation_data[:, x_offset:y_offset], tf.float32), tf.convert_to_tensor( boundary_collocation_data[:, y_offset:d_y_over_d_n_offset], tf.float32), tf.convert_to_tensor( boundary_collocation_data[:, d_y_over_d_n_offset:axes_offset], tf.float32), tf.convert_to_tensor( boundary_collocation_data[:, axes_offset], tf.int32)) @staticmethod def _create_cartesian_product_indices( first_set_size: int, second_set_size: int) -> np.ndarray: """ Creates a 2D array of indices for the Cartesian product of two sets of data rows. The first column of the returned array is the first set's row indices while the second column is the second set's row indices. :param first_set_size: the number of rows in the first set :param second_set_size: the number of rows in the second set :return: a 2D array with two columns of data row indices """ first_set_indices = np.arange(0, first_set_size) second_set_indices = np.arange(0, second_set_size) cartesian_product_first_set_indices = np.repeat( first_set_indices, second_set_size, axis=0) cartesian_product_second_set_indices = np.tile( second_set_indices, (first_set_size,)) return np.stack( (cartesian_product_first_set_indices, cartesian_product_second_set_indices), axis=1)
[docs]class DataBatch(NamedTuple): """ A container for a data batch including domain data, initial value data, and optionally boundary data. """ domain: DomainDataBatch initial: InitialDataBatch boundary: Optional[BoundaryDataBatch]
[docs]class DomainDataBatch(NamedTuple): """ A container for domain batch data. """ u: tf.Tensor t: tf.Tensor x: Optional[tf.Tensor]
[docs]class InitialDataBatch(NamedTuple): """ A container for initial condition batch data. """ u: tf.Tensor t: tf.Tensor x: Optional[tf.Tensor] y: tf.Tensor
[docs]class BoundaryDataBatch(NamedTuple): """ A container for boundary batch data. """ u: tf.Tensor t: tf.Tensor x: tf.Tensor y: tf.Tensor d_y_over_d_n: tf.Tensor axes: tf.Tensor