Source code for pysensors.reconstruction._sspor

import warnings

import numpy as np
from scipy.linalg import lstsq, solve
from sklearn.base import BaseEstimator
from sklearn.utils.validation import check_is_fitted

from ..basis import Identity
from ..optimizers import CCQR, QR, TPGR
from ..utils import validate_input

INT_DTYPES = (int, np.int64, np.int32, np.int16, np.int8)


[docs] class SSPOR(BaseEstimator): """ Sparse Sensor Placement Optimization for Reconstruction: a model for selecting the best sensor locations for state reconstruction. Given a basis in which to represent the state (e.g. PCA modes) along with measurement data, a :class:`SSPOR` instance produces a list of sensor locations (a permutation of the numbers 0, 1, ..., :code:`n_input_features` - 1) ranked in descending order of importance. One can then select the top k sensors and take future measurements at that limited set of locations. The overall time complexity of fitting a :class:`SSPOR` object is :code:`O(n_basis_modes * n_input_features * n_input_features)` plus the cost for fitting the basis. Different bases have different complexities. The space complexity is :code:`O(n_basis_modes * n_input_features)`. Parameters ---------- basis: basis object, optional (default :class:`pysensors.basis.Identity`) Basis in which to represent the data. Default is the identity basis (i.e. raw features). optimizer: optimizer object, optional \ (default :class:`pysensors.optimizers.QR`) Optimization method used to rank sensor locations. n_sensors: int, optional (default n_input_features) Number of sensors to select. Note that ``s = SSPOR(n_sensors=10); s.fit(x)`` is equivalent to ``s = SSPOR(); s.fit(x); s.set_number_of_sensors(10)``. Attributes ---------- n_basis_modes: int Number of basis modes considered during fitting. basis_matrix_: np.ndarray Internal representation of the basis. ranked_sensors_: np.ndarray Sensor locations ranked in descending order of importance. singular_values: np.ndarray Normalized singular values of the fitted data Examples -------- >>> import numpy as np >>> from pysensors import SSPOR >>> >>> x = np.linspace(0, 1, 501) >>> monomials = np.vander(x, 15).T >>> >>> model = SSPOR(n_sensors=5) >>> model.fit(monomials) SSPOR(basis=Identity(n_basis_modes=15), n_sensors=5, optimizer=QR()) >>> print(model.selected_sensors) [500 377 0 460 185] >>> print(x[model.selected_sensors]) [1. 0.754 0. 0.92 0.37 ] >>> model.set_n_sensors(7) >>> print(x[model.selected_sensors]) [1. 0.754 0. 0.92 0.37 0.572 0.134] >>> f = np.sin(3*x) >>> f_pred = model.predict(f[model.selected_sensors], method='unregularized') >>> print(np.linalg.norm(f - f_pred)) 0.022405698005838044 """ def __init__(self, basis=None, optimizer=None, n_sensors=None): if basis is None: basis = Identity() self.basis = basis if optimizer is None: optimizer = QR() self.optimizer = optimizer if n_sensors is None: self.n_sensors = None elif isinstance(n_sensors, INT_DTYPES) and n_sensors > 0: self.n_sensors = int(n_sensors) else: raise ValueError("n_sensors must be a positive integer.") self.n_basis_modes = None
[docs] def fit(self, x, quiet=False, prefit_basis=False, seed=None, **optimizer_kws): """ Fit the SSPOR model, determining which sensors are relevant. Parameters ---------- x: array-like, shape (n_samples, n_input_features) Training data. quiet: boolean, optional (default False) Whether or not to suppress warnings during fitting. prefit_basis: boolean, optional (default False) Whether or not the basis has already been fit to x. For example, you may have already fit and experimented with a ``SVD`` object to determine the optimal number of modes. This option allows you to avoid an unnecessary SVD. seed: int, optional (default None) Seed for the random number generator used to shuffle sensors after the ``self.basis.n_basis_modes`` sensor. Most optimizers only rank the top ``self.basis.n_basis_modes`` sensors, leaving the rest virtually untouched. As a result the remaining samples are randomly permuted. optimizer_kws: dict, optional Keyword arguments to be passed to the ``get_sensors`` method of the optimizer. Returns ------- self: a fitted :class:`SSPOR` instance """ # Fit basis functions to data if prefit_basis: check_is_fitted(self.basis, "basis_matrix_") else: x = validate_input(x) if quiet: with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UserWarning) self.basis.fit(x) else: self.basis.fit(x) # Get matrix representation of basis self.basis_matrix_ = self.basis.matrix_representation( n_basis_modes=self.n_basis_modes ) # Check that n_sensors doesn't exceed dimension of basis vectors and # that it doesn't exceed the number of samples when using the CCQR optimizer. self._validate_n_sensors() # Calculate the normalized singular values X_proj = x @ self.basis_matrix_ self.singular_values = np.linalg.norm(X_proj, axis=0) / np.sqrt(x.shape[0]) # Find sparse sensor locations if isinstance(self.optimizer, TPGR): self.ranked_sensors_ = self.optimizer.fit( self.basis_matrix_, self.singular_values, **optimizer_kws ).get_sensors() else: self.ranked_sensors_ = self.optimizer.fit( self.basis_matrix_, **optimizer_kws ).get_sensors() # Randomly shuffle sensors after self.basis.n_basis_modes rng = np.random.default_rng(seed) n_basis_modes = self.basis_matrix_.shape[1] self.ranked_sensors_[n_basis_modes:] = rng.permutation( self.ranked_sensors_[n_basis_modes:] ) return self
[docs] def predict(self, x, method=None, prior="decreasing", noise=None, **solve_kws): """ Predict values at all positions given measurements at sensor locations. Parameters ---------- x: array-like, shape (n_samples, n_sensors) Measurements from which to form prediction. The measurements should be taken at the sensor locations specified by ``self.get_selected_sensors()``. method : {'unregularized', None}, optional If None (default), performs regularized reconstruction using the prior and noise. If 'unregularized', uses direct or least-squares inversion depending on matrix shape. prior: str or np.ndarray, shape (n_basis_modes,), optional (default='decreasing') Prior Covariance Vector, typically a scaled identity vector or a vector containing normalized singular values. If 'decreasing', normalized singular values are used. noise: float (default None) Magnitude of the gaussian uncorrelated sensor measurement noise. If None, noise will default to the average of the computed prior. solve_kws: dict, optional keyword arguments to be passed to the linear solver used to invert the basis matrix. Returns ------- y: numpy array, shape (n_samples, n_features) Predicted values at every location. """ check_is_fitted(self, "ranked_sensors_") x = validate_input(x, self.ranked_sensors_[: self.n_sensors]).T # For efficiency we may want to factor # self.basis_matrix_[self.ranked_sensors_, :] # in case predict is called multiple times. # Although if the user changes the number of sensors between calls # the factorization will be wasted. if self.n_sensors > self.basis_matrix_.shape[0] and method == "unregularized": warnings.warn( "n_sensors exceeds dimension of basis modes. Performance may be poor " "for unregularized reconstruction. Consider using regularized " "reconstruction" ) if method is None: if isinstance(prior, str) and prior == "decreasing": computed_prior = self.singular_values elif isinstance(prior, np.ndarray): if prior.ndim != 1: raise ValueError("prior must be a 1D array") if prior.shape[0] != self.basis_matrix_.shape[1]: raise ValueError( f"prior must be of shape {(self.basis_matrix_.shape[1],)}," f" but got {prior.shape}" ) computed_prior = prior else: raise ValueError( "Invalid prior: must be 'decreasing' or a 1D " "ndarray of appropriate length." ) if noise is None: warnings.warn( "noise is None. noise will be set to the " "average of the computed prior" ) noise = computed_prior.mean() return self._regularized_reconstruction(x, computed_prior, noise) elif method == "unregularized": # Square matrix if self.n_sensors == self.basis_matrix_.shape[1]: return self._square_predict( x, self.ranked_sensors_[: self.n_sensors], **solve_kws ) # Rectangular matrix else: return self._rectangular_predict( x, self.ranked_sensors_[: self.n_sensors], **solve_kws ) else: raise NotImplementedError("Method not implemented")
def _regularized_reconstruction(self, x, prior, noise): """ Reconstruct the state using regularized reconstruction See the following reference for more information Klishin, Andrei A., et. al. Data-Induced Interactions of Sparse Sensors. 2023. arXiv:2307.11838 [cond-mat.stat-mech] x: numpy array, shape (n_features, n_sensors) Measurements prior: numpy array (n_basis_modes,) Prior variance noise: float (default None) Magnitude of the gaussian uncorrelated sensor measurement noise """ prior_cov = 1 / (prior**2) low_rank_selection_matrix = self.basis_matrix_[self.selected_sensors, :] composite_matrix = np.diag(prior_cov) + ( low_rank_selection_matrix.T @ low_rank_selection_matrix ) / (noise**2) rhs = low_rank_selection_matrix.T @ x reconstructed_state = self.basis_matrix_ @ np.linalg.solve( composite_matrix, rhs / noise**2 ) return reconstructed_state.T def _square_predict(self, x, sensors, **solve_kws): """Get prediction when the problem is square.""" return np.dot( self.basis_matrix_, solve(self.basis_matrix_[sensors, :], x, **solve_kws) ).T def _rectangular_predict(self, x, sensors, **solve_kws): """Get prediction when the problem is rectangular.""" return np.dot( self.basis_matrix_, lstsq(self.basis_matrix_[sensors, :], x, **solve_kws)[0] ).T
[docs] def get_selected_sensors(self): """ Get the indices of the sensors chosen by the model. Returns ------- sensors: numpy array, shape (n_sensors,) Indices of the sensors chosen by the model (i.e. the sensor locations) ranked in descending order of importance. """ check_is_fitted(self, "ranked_sensors_") return self.ranked_sensors_[: self.n_sensors]
@property def selected_sensors(self): """ Get the indices of the sensors chosen by the model. Returns ------- sensors: numpy array, shape (n_sensors,) Indices of the sensors chosen by the model (i.e. the sensor locations) ranked in descending order of importance. """ return self.get_selected_sensors()
[docs] def get_all_sensors(self): """ Get a ranked list consisting of all the sensors. The sensors are given in descending order of importance. Returns ------- sensors: numpy array, shape (n_features,) Indices of sensors in descending order of importance. """ return self.all_sensors
@property def all_sensors(self): """ Get a ranked list consisting of all the sensors. The sensors are given in descending order of importance. Returns ------- sensors: numpy array, shape (n_features,) Indices of sensors in descending order of importance. """ check_is_fitted(self, "ranked_sensors_") return self.ranked_sensors_
[docs] def set_number_of_sensors(self, n_sensors): """ Set ``n_sensors``, the number of sensors to be used for prediction. Parameters ---------- n_sensors: int The number of sensors. Must be a positive integer. Cannot exceed the number of available sensors (n_features). """ check_is_fitted(self, "ranked_sensors_") if not isinstance(n_sensors, INT_DTYPES): raise ValueError("n_sensors must be a positive integer") elif n_sensors <= 0: raise ValueError("n_sensors must be a positive integer") elif n_sensors > len(self.ranked_sensors_): raise ValueError( "n_sensors cannot exceed number of available sensors: " "{}".format(len(self.ranked_sensors_)) ) else: self.n_sensors = n_sensors
[docs] def set_n_sensors(self, n_sensors): """ A convenience function accomplishing the same thing as :meth:`set_number_of_sensors`. Set ``n_sensors``, the number of sensors to be used for prediction. Parameters ---------- n_sensors: int The number of sensors. Must be a positive integer. Cannot exceed the number of available sensors (n_features). """ self.set_number_of_sensors(n_sensors)
[docs] def update_n_basis_modes(self, n_basis_modes, x=None, quiet=False): """ Re-fit the :class:`SSPOR` object using a different value of ``n_basis_modes``. This method allows one to relearn sensor locations for a different number of basis modes _without_ re-fitting the basis in many cases. Specifically, if ``n_basis_modes <= self.basis.n_basis_modes`` then the basis does not need to be refit. Otherwise this function does not save any computational resources. Parameters ---------- n_basis_modes: positive int, optional (default None) Number of basis modes to be used during fit. Must be less than or equal to ``n_samples``. x: numpy array, shape (n_examples, n_features), optional (default None) Only used if ``n_basis_modes`` exceeds the number of available basis modes for the already fit basis. quiet: boolean, optional (default False) Whether or not to suppress warnings during refitting. """ if not isinstance(n_basis_modes, INT_DTYPES) or n_basis_modes <= 0: raise ValueError("n_basis_modes must be a positive integer") # No need to refit basis; only refit sensors if ( hasattr(self.basis, "basis_matrix_") and n_basis_modes <= self.basis.n_basis_modes ): self.n_basis_modes = n_basis_modes self.fit(x, prefit_basis=True, quiet=quiet) elif x is None: raise ValueError( "x cannot be None when n_basis_modes exceeds number of available modes" ) elif n_basis_modes > x.shape[0]: raise ValueError( "n_basis_modes cannot exceed the number of examples, x.shape[0]" ) else: self.n_basis_modes = n_basis_modes self.basis.n_basis_modes = n_basis_modes self.fit(x, prefit_basis=False, quiet=quiet)
[docs] def score(self, x, y=None, score_function=None, score_kws={}, solve_kws={}): """ Compute the reconstruction error for a given set of measurements. Parameters ---------- x: numpy array, shape (n_examples, n_features) Measurements with which to compute the score. Note that ``x`` should consist of measurements at every location, not just the recommended sensor location, i.e. its shape should be (n_examples, n_features) rather than (n_examples, n_sensors). y: None Dummy input to maintain compatibility with Scikit-learn. score_function: callable, optional (default None) Function used to compute the score. Should have the call signature ``score_function(y_true, y_pred, **score_kws)``. Default is the negative of the root mean squared error (sklearn expects higher scores to correspond to better performance). score_kws: dict, optional Keyword arguments to be passed to score_function. Ignored if score_function is None. solve_kws: dict, optional Keyword arguments to be passed to the predict method. Returns ------- score: float The score. """ check_is_fitted(self, "ranked_sensors_") n_input_features = len(x) if np.ndim(x) == 1 else x.shape[1] n_expected_features = len(self.ranked_sensors_) if n_expected_features != n_input_features: raise ValueError( f"x has {n_input_features} features (columns), " f"but should have {n_expected_features}" ) sensors = self.get_selected_sensors() if score_function is None: return -np.sqrt( np.mean( ( self.predict(x[:, sensors], method="unregularized", **solve_kws) - x ) ** 2 ) ) else: return score_function( x, self.predict(x[:, sensors], method="unregularized", **solve_kws), **score_kws, )
[docs] def reconstruction_error(self, x_test, sensor_range=None, score=None, **solve_kws): """ Compute the reconstruction error for different numbers of sensors. Parameters ---------- x_test: numpy array, shape (n_examples, n_features) Measurements to be reconstructed. sensor_range: 1D numpy array, optional (default None) Numbers of sensors at which to compute the reconstruction error. If None, will be set to [1, 2, ... , min(``n_sensors``, ``basis.n_basis_modes``)]. score: callable, optional (default None) Function used to compute the reconstruction error. Should have the signature ``score(x, x_pred)``. If None, the root mean squared error is used. solve_kws: dict, optional Keyword arguments to be passed to the linear solver. Returns ------- error: numpy array, shape (len(sensor_range),) Reconstruction scores for each number of sensors in ``sensor_range``. """ check_is_fitted(self, "ranked_sensors_") x_test = validate_input(x_test, self.get_all_sensors()).T basis_mode_dim, n_basis_modes = self.basis_matrix_.shape if sensor_range is None: sensor_range = np.arange(1, min(self.n_sensors, basis_mode_dim) + 1) if sensor_range[-1] > basis_mode_dim: warnings.warn( f"Performance may be poor when using more than {basis_mode_dim} sensors" ) if score is None: def score(x, y): return np.sqrt(np.mean((x - y) ** 2)) error = np.zeros_like(sensor_range, dtype=np.float64) for k, n_sensors in enumerate(sensor_range): if n_sensors == n_basis_modes: error[k] = score( self._square_predict( x_test[self.ranked_sensors_[:n_sensors]], self.ranked_sensors_[:n_sensors], **solve_kws, ), x_test.T, ) else: error[k] = score( self._rectangular_predict( x_test[self.ranked_sensors_[:n_sensors]], self.ranked_sensors_[:n_sensors], **solve_kws, ), x_test.T, ) return error
def _validate_n_sensors(self): """ Check that number of sensors does not exceed the maximimum number allowed by the chosen basis. Also check for potential conflicts between number of sensors and the optimizer. """ check_is_fitted(self, "basis_matrix_") # Maximum number of sensors (= dimension of basis vectors) max_sensors = self.basis_matrix_.shape[0] if self.n_sensors is None: self.n_sensors = max_sensors elif self.n_sensors > max_sensors: raise ValueError( "n_sensors cannot exceed number of available sensors: {}".format( max_sensors ) ) if ( isinstance(self.optimizer, CCQR) ) and self.n_sensors > self.basis_matrix_.shape[1]: warnings.warn( "Number of sensors exceeds number of samples, which may cause CCQR to " "select sensors in constrained regions." )
[docs] def std(self, prior, noise=None): """ Compute standard deviation of noise in each pixel of the reconstructed state. See the following reference for more information Klishin, Andrei A., et. al. Data-Induced Interactions of Sparse Sensors. 2023. arXiv:2307.11838 [cond-mat.stat-mech] Parameters ---------- prior: str or np.ndarray, shape (n_basis_modes,), optional (default='decreasing') Prior Covariance Vector, typically a scaled identity vector or a vector containing normalized singular values. If 'decreasing', normalized singular values are used. noise: float (default None) Magnitude of the gaussian uncorrelated sensor measurement noise. If None, noise will default to the average of the computed prior. Returns ------- sigma: numpy array, shape (n_features,) Level of uncertainty of each pixel of the reconstructed state """ check_is_fitted(self, "basis_matrix_") if isinstance(prior, str) and prior == "decreasing": computed_prior = self.singular_values elif isinstance(prior, np.ndarray): if prior.ndim != 1: raise ValueError("prior must be a 1D array") if prior.shape[0] != self.basis_matrix_.shape[1]: raise ValueError( f"prior must be of shape {(self.basis_matrix_.shape[1],)}," f" but got {prior.shape}" ) computed_prior = prior else: raise ValueError( "Invalid prior: must be 'decreasing' or a 1D " "ndarray of appropriate length." ) if noise is None: warnings.warn( "noise is None. noise will be set to the average of the computed prior" ) noise = computed_prior.mean() sq_inv_prior = 1.0 / (computed_prior**2) low_rank_selection_matrix = self.basis_matrix_[self.selected_sensors, :] composite_matrix = np.diag(sq_inv_prior) + ( low_rank_selection_matrix.T @ low_rank_selection_matrix ) / (noise**2) diag_cov_matrix = ( self.basis_matrix_
[docs] @ np.linalg.inv(composite_matrix) @ low_rank_selection_matrix.T / (noise**2) ) sigma = noise * np.sqrt(np.sum(diag_cov_matrix**2, axis=1)) return sigma
def one_pt_energy_landscape(self, prior="decreasing", noise=None): """ Compute the one-point energy landscape of the sensors See the following reference for more information Klishin, Andrei A., et. al. Data-Induced Interactions of Sparse Sensors. 2023. arXiv:2307.11838 [cond-mat.stat-mech] Parameters ---------- prior: str or np.ndarray, shape (n_basis_modes,), optional (default='decreasing') Prior Covariance Vector, typically a scaled identity vector or a vector containing normalized singular values. If 'decreasing', normalized singular values are used. noise: float (default None) Magnitude of the gaussian uncorrelated sensor measurement noise. If None, noise will default to the average of the computed prior. Returns ------- np.ndarray, shape (n_features,) """ if isinstance(self.optimizer, TPGR): check_is_fitted(self, "optimizer") else: raise TypeError( "Energy landscapes can only be computed if TPGR optimizer is used." ) if isinstance(prior, str) and prior == "decreasing": computed_prior = self.singular_values elif isinstance(prior, np.ndarray): if prior.ndim != 1: raise ValueError("prior must be a 1D array") if prior.shape[0] != self.basis_matrix_.shape[1]: raise ValueError( f"prior must be of shape {(self.basis_matrix_.shape[1],)}," f" but got {prior.shape}" ) computed_prior = prior else: raise ValueError( "Invalid prior: must be 'decreasing' or a 1D " "ndarray of appropriate length." ) if noise is None: warnings.warn( "noise is None. noise will be set to the average of the computed prior" ) noise = computed_prior.mean() G = self.basis_matrix_ @ np.diag(computed_prior) return -np.log(1 + np.einsum("ij,ij->i", G, G) / noise**2)
[docs] def two_pt_energy_landscape(self, selected_sensors, prior="decreasing", noise=None): """ Compute the two-point energy landscape of the sensors. If selected_sensors is a singular sensor, the landscape will be the two point energy interations of that sensor with the remaining sensors. If selected_sensors is a list of sensors, the landscape will be the sum of two point energy interactions of the selected sensors with the remaining sensors. See the following reference for more information Klishin, Andrei A., et. al. Data-Induced Interactions of Sparse Sensors. 2023. arXiv:2307.11838 [cond-mat.stat-mech] Parameters ---------- prior: str or np.ndarray, shape (n_basis_modes,), optional (default='decreasing') Prior Covariance Vector, typically a scaled identity vector or a vector containing normalized singular values. If 'decreasing', normalized singular values are used. noise: float (default None) Magnitude of the gaussian uncorrelated sensor measurement noise. If None, noise will default to the average of the computed prior. selected_sensors: list Indices of selected sensors for two point energy computation. Returns ------- np.ndarray, shape (n_features,) """ if isinstance(self.optimizer, TPGR): check_is_fitted(self, "optimizer") else: raise TypeError( "Energy landscapes can only be computed if TPGR optimizer is used." ) check_is_fitted(self, "optimizer") if isinstance(prior, str) and prior == "decreasing": computed_prior = self.singular_values elif isinstance(prior, np.ndarray): if prior.ndim != 1: raise ValueError("prior must be a 1D array") if prior.shape[0] != self.basis_matrix_.shape[1]: raise ValueError( f"prior must be of shape {(self.basis_matrix_.shape[1],)}," f" but got {prior.shape}" ) computed_prior = prior else: raise ValueError( "Invalid prior: must be 'decreasing' or a 1D " "ndarray of appropriate length." ) if noise is None: warnings.warn( "noise is None. noise will be set to the average of the computed prior" ) noise = computed_prior.mean() G = self.basis_matrix_ @ np.diag(computed_prior) mask = np.ones(G.shape[0], dtype=bool) mask[selected_sensors] = False G_selected = G[selected_sensors, :] if len(selected_sensors) == 1: G_selected.reshape(-1, 1) G_remaining = G[mask, :] J = 0.5 * np.sum( ((G_remaining @ G_selected.T) ** 2) / ( np.outer( 1 + (np.sum(G_remaining**2, axis=1)) / noise**2, 1 + (np.sum(G_selected**2, axis=1)) / noise**2, ) * noise**4 ), axis=1, ) J_full = np.full(G.shape[0], np.nan) J_full[mask] = J return J_full