Source code for pararealml.operators.ml.pidon.auto_differentiator

from typing import Union

import tensorflow as tf

from pararealml.mesh import CoordinateSystem


[docs]class AutoDifferentiator(tf.GradientTape): """ A class providing various differential operators using TensorFlow's auto-differentiation capabilities. """ def __init__( self, persistent: bool = False, watch_accessed_variables: bool = True): """ :param persistent: whether the gradient tape should be persistent allowing for the calculation of multiple differential operators :param watch_accessed_variables: whether to automatically watch all accessed variables within the context of the differentiator """ super(AutoDifferentiator, self).__init__( persistent, watch_accessed_variables)
[docs] def batch_gradient( self, x: tf.Tensor, y: tf.Tensor, x_axis: Union[int, tf.Tensor], coordinate_system_type: CoordinateSystem = CoordinateSystem.CARTESIAN) -> tf.Tensor: """ Returns the element(s) of the gradient of y with respect to the element of x defined by x_axis. :param x: the input tensor :param y: the output tensor :param x_axis: the element of x to take the gradient with respect to :param coordinate_system_type: the type of the coordinate system x is from :return: the gradient of y with respect to the element of x defined by x_axis """ derivative = self._batch_derivative(x, y, x_axis) if coordinate_system_type == CoordinateSystem.CARTESIAN: return derivative elif coordinate_system_type == CoordinateSystem.SPHERICAL: r = x[:, :1] if x_axis == 0: return derivative elif x_axis == 1: phi = x[:, 2:] return derivative / (r * tf.math.sin(phi)) else: return derivative / r else: if x_axis == 1: r = x[:, :1] return derivative / r else: return derivative
[docs] def batch_hessian( self, x: tf.Tensor, y: tf.Tensor, x_axis1: int, x_axis2: int, coordinate_system_type: CoordinateSystem = CoordinateSystem.CARTESIAN) -> tf.Tensor: """ Returns the element(s) of the Hessian of y with respect to the elements of x defined by x_axis1 and x_axis2. :param x: the input tensor :param y: the output tensor :param x_axis1: the first element of x to take the Hessian with respect to :param x_axis2: the second element of x to take the Hessian with respect to :param coordinate_system_type: the type of the coordinate system x is from :return: the Hessian of y with respect to the elements of x defined by x_axis1 and x_axis2 """ second_derivative = self._batch_derivative( x, self._batch_derivative(x, y, x_axis1), x_axis2) if coordinate_system_type == CoordinateSystem.CARTESIAN: return second_derivative elif coordinate_system_type == CoordinateSystem.SPHERICAL: r = x[:, :1] phi = x[:, 2:] if x_axis1 == 0 and x_axis2 == 0: return second_derivative elif x_axis1 == 1 and x_axis2 == 1: d_y_over_d_r = self._batch_derivative(x, y, 0) d_y_over_d_phi = self._batch_derivative(x, y, 2) return ( d_y_over_d_r + ( second_derivative / tf.math.sin(phi) + tf.math.cos(phi) * d_y_over_d_phi ) / (r * tf.math.sin(phi)) ) / r elif x_axis1 == 2 and x_axis2 == 2: d_y_over_d_r = self._batch_derivative(x, y, 0) return (second_derivative / r + d_y_over_d_r) / r elif (x_axis1 == 0 and x_axis2 == 1) \ or (x_axis1 == 1 and x_axis2 == 0): d_y_over_d_theta = self._batch_derivative(x, y, 1) return ( second_derivative - d_y_over_d_theta / r ) / (r * tf.math.sin(phi)) elif (x_axis1 == 0 and x_axis2 == 2) \ or (x_axis1 == 2 and x_axis2 == 0): d_y_over_d_phi = self._batch_derivative(x, y, 2) return (second_derivative - d_y_over_d_phi / r) / r else: d_y_over_d_theta = self._batch_derivative(x, y, 1) return ( tf.math.sin(phi) * second_derivative - tf.math.cos(phi) * d_y_over_d_theta ) / (r * tf.math.sin(phi)) ** 2 else: r = x[:, :1] if (x_axis1 == 0 or x_axis1 == 2) \ and (x_axis2 == 0 or x_axis2 == 2): return second_derivative elif x_axis1 == 1 and x_axis2 == 1: d_y_over_d_r = self._batch_derivative(x, y, 0) return (second_derivative / r + d_y_over_d_r) / r elif (x_axis1 == 1 and x_axis2 == 0) \ or (x_axis1 == 0 and x_axis2 == 1): d_y_over_d_theta = self._batch_derivative(x, y, 1) return (second_derivative - d_y_over_d_theta / r) / r else: return second_derivative / r
[docs] def batch_divergence( self, x: tf.Tensor, y: tf.Tensor, coordinate_system_type: CoordinateSystem = CoordinateSystem.CARTESIAN) -> tf.Tensor: """ Returns the divergence of y. :param x: the input tensor :param y: the output tensor :param coordinate_system_type: the type of the coordinate system x is from :return: the divergence of y """ if y.shape[1] != x.shape[1]: raise ValueError( f'number of y dimensions ({y.shape[1]}) must match number of ' f'x dimensions ({x.shape[1]})') if coordinate_system_type == CoordinateSystem.CARTESIAN: return tf.math.reduce_sum( tf.stack([ self._batch_derivative(x, y[..., i:i + 1], i) for i in range(x.shape[-1]) ]), axis=0) elif coordinate_system_type == CoordinateSystem.SPHERICAL: r = x[:, :1] phi = x[:, 2:] y_r = y[..., :1] y_theta = y[..., 1:2] y_phi = y[..., 2:] d_y_r_over_d_r = self._batch_derivative(x, y_r, 0) d_y_theta_over_d_theta = self._batch_derivative(x, y_theta, 1) d_y_phi_over_d_phi = self._batch_derivative(x, y_phi, 2) return d_y_r_over_d_r + ( d_y_phi_over_d_phi + tf.math.multiply(y_r, 2.) + ( d_y_theta_over_d_theta + tf.math.cos(phi) * y_phi ) / tf.math.sin(phi) ) / r else: r = x[:, :1] y_r = y[..., :1] y_theta = y[..., 1:2] d_y_r_over_d_r = self._batch_derivative(x, y_r, 0) d_y_theta_over_d_theta = self._batch_derivative(x, y_theta, 1) div = d_y_r_over_d_r + (y_r + d_y_theta_over_d_theta) / r if coordinate_system_type == CoordinateSystem.POLAR: return div else: y_z = y[..., 2:] d_y_z_over_d_z = self._batch_derivative(x, y_z, 2) return div + d_y_z_over_d_z
[docs] def batch_curl( self, x: tf.Tensor, y: tf.Tensor, curl_ind: int = 0, coordinate_system_type: CoordinateSystem = CoordinateSystem.CARTESIAN) -> tf.Tensor: """ Returns the curl_ind-th component of the curl of y. :param x: the input tensor :param y: the output tensor :param curl_ind: the index of the component of the curl of y to compute; if y is a two dimensional vector field, it must be 0 :param coordinate_system_type: the type of the coordinate system x is from :return: the curl_ind-th component of the curl of y """ x_dimension = x.shape[1] if y.shape[1] != x_dimension: raise ValueError( f'number of y dimensions ({y.shape[1]}) must match number of ' f'x dimensions ({x_dimension})') if not (2 <= x_dimension <= 3): raise ValueError( f'number of x dimensions ({x_dimension}) must be 2 or 3') if x_dimension == 2 and curl_ind != 0: raise ValueError(f'curl index ({curl_ind}) must be 0 for 2D curl') if not (0 <= curl_ind < x_dimension): raise ValueError( f'curl index ({curl_ind}) must be non-negative and less than ' f'number of x dimensions ({x_dimension})') if coordinate_system_type == CoordinateSystem.CARTESIAN: if x_dimension == 2 or curl_ind == 2: return self._batch_derivative(x, y[..., 1:2], 0) - \ self._batch_derivative(x, y[..., :1], 1) elif curl_ind == 0: return self._batch_derivative(x, y[..., 2:], 1) - \ self._batch_derivative(x, y[..., 1:2], 2) else: return self._batch_derivative(x, y[..., :1], 2) - \ self._batch_derivative(x, y[..., 2:], 0) elif coordinate_system_type == CoordinateSystem.SPHERICAL: r = x[:, :1] phi = x[:, 2:] y_r = y[..., :1] y_theta = y[..., 1:2] y_phi = y[..., 2:] if curl_ind == 0: d_y_theta_over_d_phi = self._batch_derivative(x, y_theta, 2) d_y_phi_over_d_theta = self._batch_derivative(x, y_phi, 1) return ( d_y_theta_over_d_phi + ( tf.math.cos(phi) * y_theta - d_y_phi_over_d_theta ) / tf.math.sin(phi) ) / r elif curl_ind == 1: d_y_r_over_d_phi = self._batch_derivative(x, y_r, 2) d_y_phi_over_d_r = self._batch_derivative(x, y_phi, 0) return d_y_phi_over_d_r + (y_phi - d_y_r_over_d_phi) / r else: d_y_r_over_d_theta = self._batch_derivative(x, y_r, 1) d_y_theta_over_d_r = self._batch_derivative(x, y_theta, 0) return -d_y_theta_over_d_r + ( d_y_r_over_d_theta / tf.math.sin(phi) - y_theta ) / r else: r = x[:, :1] y_r = y[..., :1] y_theta = y[..., 1:2] if coordinate_system_type == CoordinateSystem.POLAR \ or curl_ind == 2: d_y_r_over_d_theta = self._batch_derivative(x, y_r, 1) d_y_theta_over_d_r = self._batch_derivative(x, y_theta, 0) return d_y_theta_over_d_r + (y_theta - d_y_r_over_d_theta) / r elif curl_ind == 0: y_z = y[..., 2:] d_y_theta_over_d_z = self._batch_derivative(x, y_theta, 2) d_y_z_over_d_theta = self._batch_derivative(x, y_z, 1) return d_y_z_over_d_theta / r - d_y_theta_over_d_z else: y_z = y[..., 2:] d_y_r_over_d_z = self._batch_derivative(x, y_r, 2) d_y_z_over_d_r = self._batch_derivative(x, y_z, 0) return d_y_r_over_d_z - d_y_z_over_d_r
[docs] def batch_laplacian( self, x: tf.Tensor, y: tf.Tensor, coordinate_system_type: CoordinateSystem = CoordinateSystem.CARTESIAN) -> tf.Tensor: """ Returns the element-wise scalar Laplacian of y. :param x: the input tensor :param y: the output tensor :param coordinate_system_type: the type of the coordinate system x is from :return: the Laplacian of y """ if coordinate_system_type == CoordinateSystem.CARTESIAN: return tf.math.reduce_sum( tf.stack([ self._batch_derivative( x, self._batch_derivative(x, y, i), i) for i in range(x.shape[-1]) ]), axis=0) elif coordinate_system_type == CoordinateSystem.SPHERICAL: r = x[:, :1] phi = x[:, 2:] d_y_over_d_r = self._batch_derivative(x, y, 0) d_y_over_d_theta = self._batch_derivative(x, y, 1) d_y_over_d_phi = self._batch_derivative(x, y, 2) d_sqr_y_over_d_r_sqr = self._batch_derivative(x, d_y_over_d_r, 0) d_sqr_y_over_d_theta_sqr = \ self._batch_derivative(x, d_y_over_d_theta, 1) d_sqr_y_over_d_phi_sqr = \ self._batch_derivative(x, d_y_over_d_phi, 2) return d_sqr_y_over_d_r_sqr + ( tf.math.multiply(d_y_over_d_r, 2.) + ( d_sqr_y_over_d_phi_sqr + ( tf.math.cos(phi) * d_y_over_d_phi + d_sqr_y_over_d_theta_sqr / tf.math.sin(phi) ) / tf.math.sin(phi) ) / r ) / r else: r = x[:, :1] d_y_over_d_r = self._batch_derivative(x, y, 0) d_y_over_d_theta = self._batch_derivative(x, y, 1) d_sqr_y_over_d_r_sqr = self._batch_derivative(x, d_y_over_d_r, 0) d_sqr_y_over_d_theta_sqr = \ self._batch_derivative(x, d_y_over_d_theta, 1) laplacian = d_sqr_y_over_d_r_sqr + \ (d_sqr_y_over_d_theta_sqr / r + d_y_over_d_r) / r if coordinate_system_type == CoordinateSystem.POLAR: return laplacian else: d_y_over_d_z = self._batch_derivative(x, y, 2) d_sqr_y_over_d_z_sqr = \ self._batch_derivative(x, d_y_over_d_z, 2) return laplacian + d_sqr_y_over_d_z_sqr
[docs] def batch_vector_laplacian( self, x: tf.Tensor, y: tf.Tensor, vector_laplacian_ind: int, coordinate_system_type: CoordinateSystem = CoordinateSystem.CARTESIAN) -> tf.Tensor: """ Returns the vector Laplacian of y. :param x: the input tensor :param y: the output tensor :param vector_laplacian_ind: the index of the component of the vector Laplacian of y to compute :param coordinate_system_type: the type of the coordinate system x is from :return: the vector Laplacian of y """ x_dimension = x.shape[1] if y.shape[1] != x_dimension: raise ValueError( f'number of y dimensions ({y.shape[1]}) must match number of ' f'x dimensions ({x_dimension})') if not (0 <= vector_laplacian_ind < x_dimension): raise ValueError( f'vector Laplacian index ({vector_laplacian_ind}) must be ' 'non-negative and less than number of x dimensions ' f'({x_dimension})') laplacian = self.batch_laplacian( x, y[:, vector_laplacian_ind:vector_laplacian_ind + 1]) if coordinate_system_type == CoordinateSystem.CARTESIAN: return laplacian elif coordinate_system_type == CoordinateSystem.SPHERICAL: r = x[:, :1] phi = x[:, 2:] y_r = y[:, :1] y_theta = y[:, 1:2] y_phi = y[:, 2:] if vector_laplacian_ind == 1: d_y_theta_over_d_theta = self._batch_derivative(x, y_theta, 1) d_y_phi_over_d_phi = self._batch_derivative(x, y_phi, 2) return laplacian - tf.math.multiply( y_r + d_y_phi_over_d_phi + ( tf.cos(phi) * y_phi + d_y_theta_over_d_theta ) / tf.sin(phi), 2. ) / r ** 2 elif vector_laplacian_ind == 2: d_y_r_over_d_theta = self._batch_derivative(x, y_r, 1) d_y_phi_over_d_theta = self._batch_derivative(x, y_phi, 1) return laplacian + tf.math.multiply( d_y_r_over_d_theta + ( tf.cos(phi) * d_y_phi_over_d_theta - tf.math.divide(y_theta, 2.) ) / tf.sin(phi), 2. ) / (tf.sin(phi) * r ** 2) else: d_y_r_over_d_phi = self._batch_derivative(x, y_r, 2) d_y_theta_over_d_theta = self._batch_derivative(x, y_theta, 1) return laplacian + tf.math.multiply( d_y_r_over_d_phi - ( tf.math.divide(y_phi, 2.) + tf.cos(phi) * d_y_theta_over_d_theta ) / tf.sin(phi) ** 2, 2. ) / r ** 2 else: r = x[:, :1] y_r = y[:, :1] y_theta = y[:, 1:2] if vector_laplacian_ind == 0: d_y_theta_over_d_theta = self._batch_derivative(x, y_theta, 1) return laplacian - ( y_r + tf.math.multiply(d_y_theta_over_d_theta, 2.) ) / r ** 2 elif vector_laplacian_ind == 1: d_y_r_over_d_theta = self._batch_derivative(x, y_r, 1) return laplacian - ( y_theta - tf.math.multiply(d_y_r_over_d_theta, 2.) ) / r ** 2 else: return laplacian
def _batch_derivative( self, x: tf.Tensor, y: tf.Tensor, x_axis: Union[int, tf.Tensor]) -> tf.Tensor: """ Returns the element(s) of the first derivative of y with respect to the element of x defined by x_axis. :param x: the input tensor :param y: the output tensor :param x_axis: the element of x to take the gradient with respect to :return: the first derivative of y with respect to the element of x defined by x_axis """ if x.shape[0] != y.shape[0]: raise ValueError( f'number of x instances ({x.shape[0]}) must match number of ' f'y instances ({y.shape[0]})') if isinstance(x_axis, int): if not (0 <= x_axis < x.shape[-1]): raise ValueError( f'x-axis ({x_axis}) must be non-negative and less than ' f'number of x dimensions ({x.shape[-1]})') elif isinstance(x_axis, tf.Tensor): if len(x_axis.shape) != 1: raise ValueError('x-axis must be a 1 dimensional array') if x_axis.shape[0] != x.shape[0]: raise ValueError( f'length of x-axis ({x_axis.shape[0]}) must match number ' f'of x instances ({x.shape[0]})') derivatives = self.batch_jacobian(y, x) return tf.gather(derivatives, x_axis, axis=2, batch_dims=1) \ if isinstance(x_axis, tf.Tensor) \ else derivatives[:, :, x_axis]