import scipy as sp
from scipy.linalg.interpolative import estimate_spectral_norm
from scipy.sparse.linalg import LinearOperator as scipyLinearOperator
import numpy as np
import cuqi
from cuqi.solver import CGLS, FISTA, ADMM, ScipyLinearLSQ, ScipyMinimizer
from cuqi.sampler import Sampler
[docs]
class LinearRTO(Sampler):
"""
Linear RTO (Randomize-Then-Optimize) sampler.
Samples posterior related to the inverse problem with Gaussian likelihood and prior, and where the forward model is linear or more generally affine.
Parameters
------------
target : `cuqi.distribution.Posterior`, `cuqi.distribution.MultipleLikelihoodPosterior` or 5-dimensional tuple.
If target is of type cuqi.distribution.Posterior or cuqi.distribution.MultipleLikelihoodPosterior, it represents the posterior distribution.
If target is a 5-dimensional tuple, it assumes the following structure:
(data, model, L_sqrtprec, P_mean, P_sqrtrec)
Here:
data: is a m-dimensional numpy array containing the measured data.
model: is a m by n dimensional matrix, AffineModel or LinearModel representing the forward model.
L_sqrtprec: is the squareroot of the precision matrix of the Gaussian likelihood.
P_mean: is the prior mean.
P_sqrtprec: is the squareroot of the precision matrix of the Gaussian mean.
initial_point : `np.ndarray`
Initial point for the sampler. *Optional*.
maxit : int
Maximum number of iterations of the inner CGLS solver. *Optional*.
tol : float
Tolerance of the inner CGLS solver. *Optional*.
inner_initial_point : string or np.ndarray or cuqi.array.CUQIArray
Initial point for the inner optimization problem. Can be "previous_sample" (default), "MAP", or a specific numpy or cuqi array. *Optional*.
callback : callable, optional
A function that will be called after each sampling step. It can be useful for monitoring the sampler during sampling.
The function should take three arguments: the sampler object, the index of the current sampling step, the total number of requested samples. The last two arguments are integers. An example of the callback function signature is: `callback(sampler, sample_index, num_of_samples)`.
"""
[docs]
def __init__(self, target=None, initial_point=None, maxit=10, tol=1e-6, inner_initial_point="previous_sample", **kwargs):
super().__init__(target=target, initial_point=initial_point, **kwargs)
# Other parameters
self.maxit = maxit
self.tol = tol
self.inner_initial_point = inner_initial_point
def _initialize(self):
self._precompute()
self._compute_map()
@property
def inner_initial_point(self):
if isinstance(self._inner_initial_point, str):
if self._inner_initial_point == "previous_sample":
return self.current_point
elif self._inner_initial_point == "map":
return self._map
else:
return self._inner_initial_point
@inner_initial_point.setter
def inner_initial_point(self, value):
is_correct_string = (isinstance(value, str) and
(value.lower() == "previous_sample" or
value.lower() == "map"))
if is_correct_string:
self._inner_initial_point = value.lower()
elif isinstance(value, (np.ndarray, cuqi.array.CUQIarray)):
self._inner_initial_point = value
else:
raise ValueError("Invalid value for inner_initial_point. Choose either 'previous_sample', 'MAP', or provide a numpy array/cuqi array.")
@property
def prior(self):
return self.target.prior
@property
def likelihood(self):
return self.target.likelihood
@property
def likelihoods(self):
if isinstance(self.target, cuqi.distribution.Posterior):
return [self.target.likelihood]
elif isinstance(self.target, cuqi.distribution.MultipleLikelihoodPosterior):
return self.target.likelihoods
@property
def model(self):
return self.target.model
@property
def models(self):
if isinstance(self.target, cuqi.distribution.Posterior):
return [self.target.model]
elif isinstance(self.target, cuqi.distribution.MultipleLikelihoodPosterior):
return self.target.models
def _compute_map(self):
sim = CGLS(self.M, self.b_tild, self.current_point, self.maxit, self.tol)
self._map, _ = sim.solve()
def _precompute(self):
L1 = [likelihood.distribution.sqrtprec for likelihood in self.likelihoods]
L2 = self.prior.sqrtprec
L2mu = self.prior.sqrtprecTimesMean
# pre-computations
self.n = self.prior.dim
self.b_tild = np.hstack([L@(likelihood.data - model._shift) for (L, likelihood, model) in zip(L1, self.likelihoods, self.models)]+ [L2mu]) # With shift from AffineModel
callability = [callable(likelihood.model) for likelihood in self.likelihoods]
notcallability = [not c for c in callability]
if all(notcallability):
self.M = sp.sparse.vstack([L@likelihood.model for (L, likelihood) in zip(L1, self.likelihoods)] + [L2])
elif all(callability):
# in this case, model is a function doing forward and backward operations
def M(x, flag):
if flag == 1:
out1 = [L @ likelihood.model._forward_func_no_shift(x) for (L, likelihood) in zip(L1, self.likelihoods)] # Use forward function which excludes shift
out2 = L2 @ x
out = np.hstack(out1 + [out2])
elif flag == 2:
idx_start = 0
idx_end = 0
out1 = np.zeros(self.n)
for likelihood in self.likelihoods:
idx_end += len(likelihood.data)
out1 += likelihood.model._adjoint_func_no_shift(likelihood.distribution.sqrtprec.T@x[idx_start:idx_end])
idx_start = idx_end
out2 = L2.T @ x[idx_end:]
out = out1 + out2
return out
self.M = M
else:
raise TypeError("All likelihoods need to be callable or none need to be callable.")
[docs]
def step(self):
y = self.b_tild + np.random.randn(len(self.b_tild))
sim = CGLS(self.M, y, self.inner_initial_point, self.maxit, self.tol)
self.current_point, _ = sim.solve()
acc = 1
return acc
[docs]
def tune(self, skip_len, update_count):
pass
[docs]
def validate_target(self):
# Check target type
if not isinstance(self.target, (cuqi.distribution.Posterior, cuqi.distribution.MultipleLikelihoodPosterior)):
raise ValueError(f"To initialize an object of type {self.__class__}, 'target' need to be of type 'cuqi.distribution.Posterior' or 'cuqi.distribution.MultipleLikelihoodPosterior'.")
# Check Linear model and Gaussian likelihood(s)
if isinstance(self.target, cuqi.distribution.Posterior):
if not isinstance(self.model, cuqi.model.AffineModel):
raise TypeError("Model needs to be linear or more generally affine")
if not hasattr(self.likelihood.distribution, "sqrtprec"):
raise TypeError("Distribution in Likelihood must contain a sqrtprec attribute")
elif isinstance(self.target, cuqi.distribution.MultipleLikelihoodPosterior): # Elif used for further alternatives, e.g., stacked posterior
for likelihood in self.likelihoods:
if not isinstance(likelihood.model, cuqi.model.AffineModel):
raise TypeError("Model needs to be linear or more generally affine")
if not hasattr(likelihood.distribution, "sqrtprec"):
raise TypeError("Distribution in Likelihood must contain a sqrtprec attribute")
# Check Gaussian prior
if not hasattr(self.prior, "sqrtprec"):
raise TypeError("prior must contain a sqrtprec attribute")
if not hasattr(self.prior, "sqrtprecTimesMean"):
raise TypeError("Prior must contain a sqrtprecTimesMean attribute")
def _get_default_initial_point(self, dim):
""" Get the default initial point for the sampler. Defaults to an array of zeros. """
return np.zeros(dim)
[docs]
class RegularizedLinearRTO(LinearRTO):
"""
Regularized Linear RTO (Randomize-Then-Optimize) sampler.
Samples posterior related to the inverse problem with Gaussian likelihood and implicit Gaussian prior, and where the forward model is Linear.
The sampler works by repeatedly solving regularized linear least squares problems for perturbed data.
The solver for these optimization problems is chosen based on how the regularized is provided in the implicit Gaussian prior.
Currently we use the following solvers:
FISTA: [1] Beck, Amir, and Marc Teboulle. "A fast iterative shrinkage-thresholding algorithm for linear inverse problems." SIAM journal on imaging sciences 2.1 (2009): 183-202.
Used when prior.proximal is callable.
ADMM: [2] Boyd et al. "Distributed optimization and statistical learning via the alternating direction method of multipliers."Foundations and Trends® in Machine learning, 2011.
Used when prior.proximal is a list of penalty terms.
ScipyLinearLSQ: Wrapper for Scipy's lsq_linear for the Trust Region Reflective algorithm. Optionally used when the constraint is either "nonnegativity" or "box".
ScipyMinimizer: Wrapper for Scipy's minimize. Optionally used when the constraint is either "nonnegativity" or "box".
Parameters
------------
target : `cuqi.distribution.Posterior`
See `cuqi.sampler.LinearRTO`
initial_point : `np.ndarray`
Initial point for the sampler. *Optional*.
maxit : int
Maximum number of iterations of the FISTA/ADMM/ScipyLinearLSQ/ScipyMinimizer solver. *Optional*.
inner_max_it : int
Maximum number of iterations of the CGLS solver used within the ADMM solver. *Optional*.
stepsize : string or float
If stepsize is a string and equals either "automatic", then the stepsize is automatically estimated based on the spectral norm.
If stepsize is a float, then this stepsize is used.
penalty_parameter : int
Penalty parameter of the ADMM solver. *Optional*.
See [2] or `cuqi.solver.ADMM`
abstol : float
Absolute tolerance of the FISTA/ScipyLinearLSQ/ScipyMinimizer solver. *Optional*.
inner_abstol : float
Tolerance parameter for ScipyLinearLSQ's inner solve of the unbounded least-squares problem. *Optional*.
adaptive : bool
If True, FISTA is used as solver, otherwise ISTA is used. *Optional*.
solver : string
Options are "FISTA" (default for a single constraint or regularization), "ADMM" (default and the only option for multiple constraints or regularizations), "ScipyLinearLSQ" and "ScipyMinimizer". Note "ScipyLinearLSQ" and "ScipyMinimizer" can only be used with `RegularizedGaussian` of a single `box` or `nonnegativity` constraint. *Optional*.
inner_initial_point : string or np.ndarray or cuqi.array.CUQIArray
Initial point for the inner optimization problem. Can be "previous_sample" (default), "MAP", or a specific numpy or cuqi array. *Optional*.
callback : callable, optional
A function that will be called after each sampling step. It can be useful for monitoring the sampler during sampling.
The function should take three arguments: the sampler object, the index of the current sampling step, the total number of requested samples. The last two arguments are integers. An example of the callback function signature is: `callback(sampler, sample_index, num_of_samples)`.
"""
[docs]
def __init__(self, target=None, initial_point=None, maxit=100, inner_max_it=10, stepsize="automatic", penalty_parameter=10, abstol=1e-10, adaptive=True, solver=None, inner_abstol=None, inner_initial_point="previous_sample", **kwargs):
super().__init__(target=target, initial_point=initial_point, **kwargs)
# Other parameters
self.stepsize = stepsize
self.abstol = abstol
self.inner_abstol = inner_abstol
self.adaptive = adaptive
self.maxit = maxit
self.inner_max_it = inner_max_it
self.penalty_parameter = penalty_parameter
self.solver = solver
self.inner_initial_point = inner_initial_point
def _initialize(self):
super()._initialize()
if self.solver is None:
self.solver = "FISTA" if callable(self.proximal) else "ADMM"
if self.solver == "FISTA":
self._stepsize = self._choose_stepsize()
self._compute_map_regularized()
@property
def solver(self):
return self._solver
@solver.setter
def solver(self, value):
if value == "ScipyLinearLSQ" or value == "ScipyMinimizer":
if (self.target.prior.preset["constraint"] == "nonnegativity" or self.target.prior.preset["constraint"] == "box"):
self._solver = value
else:
raise ValueError("ScipyLinearLSQ and ScipyMinimizer only support RegularizedGaussian with box or nonnegativity constraint.")
else:
self._solver = value
@property
def proximal(self):
return self.target.prior.proximal
[docs]
def validate_target(self):
super().validate_target()
if not isinstance(self.target.prior, (cuqi.implicitprior.RegularizedGaussian, cuqi.implicitprior.RegularizedGMRF)):
raise TypeError("Prior needs to be RegularizedGaussian or RegularizedGMRF")
def _choose_stepsize(self):
if isinstance(self.stepsize, str):
if self.stepsize in ["automatic"]:
if not callable(self.M):
M_op = scipyLinearOperator(self.M.shape, matvec = lambda v: self.M@v, rmatvec = lambda w: self.M.T@w)
else:
M_op = scipyLinearOperator((len(self.b_tild), self.n), matvec = lambda v: self.M(v,1), rmatvec = lambda w: self.M(w,2))
_stepsize = 0.99/(estimate_spectral_norm(M_op)**2)
# print(f"Estimated stepsize for regularized Linear RTO: {_stepsize}")
else:
raise ValueError("Stepsize choice not supported")
else:
_stepsize = self.stepsize
return _stepsize
@property
def prior(self):
return self.target.prior.gaussian
def _compute_map_regularized(self):
self._map = self._customized_step(self.b_tild, self.initial_point)
def _customized_step(self, y, x0):
if self.solver == "FISTA":
sim = FISTA(self.M, y, self.proximal,
x0, maxit = self.maxit, stepsize = self._stepsize, abstol = self.abstol, adaptive = self.adaptive)
elif self.solver == "ADMM":
sim = ADMM(self.M, y, self.proximal,
x0, self.penalty_parameter, maxit = self.maxit, inner_max_it = self.inner_max_it, adaptive = self.adaptive)
elif self.solver == "ScipyLinearLSQ":
A_op = sp.sparse.linalg.LinearOperator((sum([llh.distribution.dim for llh in self.likelihoods])+self.target.prior.dim, self.target.prior.dim),
matvec=lambda x: self.M(x, 1),
rmatvec=lambda x: self.M(x, 2)
)
sim = ScipyLinearLSQ(A_op, y, self.target.prior._box_bounds,
max_iter = self.maxit,
lsmr_maxiter = self.inner_max_it,
tol = self.abstol,
lsmr_tol = self.inner_abstol)
elif self.solver == "ScipyMinimizer":
# Adapt bounds format, as scipy.minimize requires a bounds format
# different than that in scipy.lsq_linear.
bounds = [(self.target.prior._box_bounds[0][i], self.target.prior._box_bounds[1][i]) for i in range(self.target.prior.dim)]
# Note that the objective function is defined as 0.5*||Mx-y||^2,
# and the corresponding gradient (gradfunc) is given by M^T(Mx-y).
sim = ScipyMinimizer(lambda x: 0.5*np.sum((self.M(x, 1)-y)**2), x0, gradfunc=lambda x: self.M(self.M(x, 1) - y, 2), bounds=bounds, tol=self.abstol, options={"maxiter": self.maxit})
else:
raise ValueError("Choice of solver not supported.")
sol, _ = sim.solve()
return sol
[docs]
def step(self):
y = self.b_tild + np.random.randn(len(self.b_tild))
self.current_point = self._customized_step(y, self.inner_initial_point)
acc = 1
return acc