Source code for cuqi.legacy.sampler._pcn

import numpy as np
import cuqi
from cuqi.legacy.sampler import Sampler

[docs] class pCN(Sampler): #Samples target*proposal #TODO. Check proposal, needs to be Gaussian and zero mean. """Preconditioned Crank-Nicolson sampler Parameters ---------- target : `cuqi.distribution.Posterior` or tuple of likelihood and prior objects If target is of type cuqi.distribution.Posterior, it represents the posterior distribution. If target is a tuple of (cuqi.likelihood.Likelihood, cuqi.distribution.Distribution) objects, the first element is considered the likelihood and the second is considered the prior. scale : int x0 : `np.ndarray` Initial point for the sampler callback : callable, *Optional* If set this function will be called after every sample. The signature of the callback function is `callback(sample, sample_index)`, where `sample` is the current sample and `sample_index` is the index of the sample. An example is shown in demos/demo31_callback.py. Example ------- This uses a custom logpdf and sample function. .. code-block:: python # Parameters dim = 5 # Dimension of distribution mu = np.arange(dim) # Mean of Gaussian std = 1 # standard deviation of Gaussian # Logpdf function of likelihood logpdf_func = lambda x: -1/(std**2)*np.sum((x-mu)**2) # sample function of prior N(0,I) sample_func = lambda : 0 + 1*np.random.randn(dim,1) # Define as UserDefinedDistributions likelihood = cuqi.likelihood.UserDefinedLikelihood(dim=dim, logpdf_func=logpdf_func) prior = cuqi.distribution.UserDefinedDistribution(dim=dim, sample_func=sample_func) # Set up sampler sampler = cuqi.legacy.sampler.pCN((likelihood,prior), scale = 0.1) # Sample samples = sampler.sample(5000) Example ------- This uses CUQIpy distributions. .. code-block:: python # Parameters dim = 5 # Dimension of distribution mu = np.arange(dim) # Mean of Gaussian std = 1 # standard deviation of Gaussian # Define as UserDefinedDistributions model = cuqi.model.Model(lambda x: x, range_geometry=dim, domain_geometry=dim) likelihood = cuqi.distribution.Gaussian(mean=model, cov=np.ones(dim)).to_likelihood(mu) prior = cuqi.distribution.Gaussian(mean=np.zeros(dim), cov=1) target = cuqi.distribution.Posterior(likelihood, prior) # Set up sampler sampler = cuqi.legacy.sampler.pCN(target, scale = 0.1) # Sample samples = sampler.sample(5000) """
[docs] def __init__(self, target, scale=None, x0=None, **kwargs): super().__init__(target, x0=x0, dim=None, **kwargs) self.scale = scale
@property def prior(self): if isinstance(self.target, cuqi.distribution.Posterior): return self.target.prior elif isinstance(self.target,tuple) and len(self.target)==2: return self.target[1] @property def likelihood(self): if isinstance(self.target, cuqi.distribution.Posterior): return self.target.likelihood elif isinstance(self.target,tuple) and len(self.target)==2: return self.target[0] @Sampler.target.setter def target(self, value): if isinstance(value, cuqi.distribution.Posterior): self._target = value self._loglikelihood = lambda x : self.likelihood.logd(x) elif isinstance(value,tuple) and len(value)==2 and \ (isinstance(value[0], cuqi.likelihood.Likelihood) or isinstance(value[0], cuqi.likelihood.UserDefinedLikelihood)) and \ isinstance(value[1], cuqi.distribution.Distribution): self._target = value self._loglikelihood = lambda x : self.likelihood.logd(x) else: raise ValueError(f"To initialize an object of type {self.__class__}, 'target' need to be of type 'cuqi.distribution.Posterior'.") #TODO: #if not isinstance(self.prior,(cuqi.distribution.Gaussian, cuqi.distribution.Normal)): # raise ValueError("The prior distribution of the target need to be Gaussian") @property def dim(self): if hasattr(self,'target') and hasattr(self.target,'dim'): self._dim = self.target.dim elif hasattr(self,'target') and isinstance(self.target,tuple) and len(self.target)==2: self._dim = self.target[0].dim return self._dim def _sample(self, N, Nb): if self.scale is None: raise ValueError("Scale must be set to sample without adaptation. Consider using sample_adapt instead.") Ns = N+Nb # number of simulations # allocation samples = np.empty((self.dim, Ns)) loglike_eval = np.empty(Ns) acc = np.zeros(Ns, dtype=int) # initial state samples[:, 0] = self.x0 loglike_eval[0] = self._loglikelihood(self.x0) acc[0] = 1 # run MCMC for s in range(Ns-1): # run component by component samples[:, s+1], loglike_eval[s+1], acc[s+1] = self.single_update(samples[:, s], loglike_eval[s]) self._print_progress(s+2,Ns) #s+2 is the sample number, s+1 is index assuming x0 is the first sample self._call_callback(samples[:, s+1], s+1) # remove burn-in samples = samples[:, Nb:] loglike_eval = loglike_eval[Nb:] accave = acc[Nb:].mean() print('\nAverage acceptance rate:', accave, '\n') # return samples, loglike_eval, accave def _sample_adapt(self, N, Nb): # Set intial scale if not set if self.scale is None: self.scale = 0.1 Ns = N+Nb # number of simulations # allocation samples = np.empty((self.dim, Ns)) loglike_eval = np.empty(Ns) acc = np.zeros(Ns) # initial state samples[:, 0] = self.x0 loglike_eval[0] = self._loglikelihood(self.x0) acc[0] = 1 # initial adaptation params Na = int(0.1*N) # iterations to adapt hat_acc = np.empty(int(np.floor(Ns/Na))) # average acceptance rate of the chains lambd = self.scale star_acc = 0.44 # target acceptance rate RW i, idx = 0, 0 # run MCMC for s in range(Ns-1): # run component by component samples[:, s+1], loglike_eval[s+1], acc[s+1] = self.single_update(samples[:, s], loglike_eval[s]) # adapt prop spread using acc of past samples if ((s+1) % Na == 0): # evaluate average acceptance rate hat_acc[i] = np.mean(acc[idx:idx+Na]) # d. compute new scaling parameter zeta = 1/np.sqrt(i+1) # ensures that the variation of lambda(i) vanishes lambd = np.exp(np.log(lambd) + zeta*(hat_acc[i]-star_acc)) # update parameters self.scale = min(lambd, 1) # update counters i += 1 idx += Na # display iterations if ((s+1) % (max(Ns//100,1))) == 0 or (s+1) == Ns-1: print("\r",'Sample', s+1, '/', Ns, end="") self._call_callback(samples[:, s+1], s+1) print("\r",'Sample', s+2, '/', Ns) # remove burn-in samples = samples[:, Nb:] loglike_eval = loglike_eval[Nb:] accave = acc[Nb:].mean() print('\nAverage acceptance rate:', accave, 'MCMC scale:', self.scale, '\n') return samples, loglike_eval, accave
[docs] def single_update(self, x_t, loglike_eval_t): # propose state xi = self.prior.sample(1).flatten() # sample from the prior x_star = np.sqrt(1-self.scale**2)*x_t + self.scale*xi # pCN proposal # evaluate target loglike_eval_star = self._loglikelihood(x_star) # ratio and acceptance probability ratio = loglike_eval_star - loglike_eval_t # proposal is symmetric alpha = min(0, ratio) # accept/reject u_theta = np.log(np.random.rand()) if (u_theta <= alpha): x_next = x_star loglike_eval_next = loglike_eval_star acc = 1 else: x_next = x_t loglike_eval_next = loglike_eval_t acc = 0 return x_next, loglike_eval_next, acc