Source code for cuqi.legacy.sampler._cwmh

import numpy as np
import cuqi
from cuqi.legacy.sampler import ProposalBasedSampler


[docs] class CWMH(ProposalBasedSampler): """Component-wise Metropolis Hastings sampler. Allows sampling of a target distribution by a component-wise random-walk sampling of a proposal distribution along with an accept/reject step. Parameters ---------- target : `cuqi.distribution.Distribution` or lambda function The target distribution to sample. Custom logpdfs are supported by using a :class:`cuqi.distribution.UserDefinedDistribution`. proposal : `cuqi.distribution.Distribution` or callable method The proposal to sample from. If a callable method it should provide a single independent sample from proposal distribution. Defaults to a Gaussian proposal. *Optional*. scale : float Scale parameter used to define correlation between previous and proposed sample in random-walk. *Optional*. x0 : ndarray Initial parameters. *Optional* dim : int Dimension of parameter space. Required if target and proposal are callable functions. *Optional*. callback : callable, *Optional* If set this function will be called after every sample. The signature of the callback function is `callback(sample, sample_index)`, where `sample` is the current sample and `sample_index` is the index of the sample. An example is shown in demos/demo31_callback.py. Example ------- .. code-block:: python # Parameters dim = 5 # Dimension of distribution mu = np.arange(dim) # Mean of Gaussian std = 1 # standard deviation of Gaussian # Logpdf function logpdf_func = lambda x: -1/(std**2)*np.sum((x-mu)**2) # Define distribution from logpdf as UserDefinedDistribution (sample and gradients also supported as inputs to UserDefinedDistribution) target = cuqi.distribution.UserDefinedDistribution(dim=dim, logpdf_func=logpdf_func) # Set up sampler sampler = cuqi.legacy.sampler.CWMH(target, scale=1) # Sample samples = sampler.sample(2000) """
[docs] def __init__(self, target, proposal=None, scale=1, x0=None, dim = None, **kwargs): super().__init__(target, proposal=proposal, scale=scale, x0=x0, dim=dim, **kwargs)
@ProposalBasedSampler.proposal.setter def proposal(self, value): fail_msg = "Proposal should be either None, cuqi.distribution.Distribution conditioned only on 'location' and 'scale', lambda function, or cuqi.distribution.Normal conditioned only on 'mean' and 'std'" if value is None: self._proposal = cuqi.distribution.Normal(mean = lambda location:location,std = lambda scale:scale, geometry=self.dim) elif isinstance(value, cuqi.distribution.Distribution) and sorted(value.get_conditioning_variables())==['location','scale']: self._proposal = value elif isinstance(value, cuqi.distribution.Normal) and sorted(value.get_conditioning_variables())==['mean','std']: self._proposal = value(mean = lambda location:location, std = lambda scale:scale) elif not isinstance(value, cuqi.distribution.Distribution) and callable(value): self._proposal = value else: raise ValueError(fail_msg) def _sample(self, N, Nb): Ns = N+Nb # number of simulations # allocation samples = np.empty((self.dim, Ns)) target_eval = np.empty(Ns) acc = np.zeros((self.dim, Ns), dtype=int) # initial state samples[:, 0] = self.x0 target_eval[0] = self.target.logd(self.x0) acc[:, 0] = np.ones(self.dim) # run MCMC for s in range(Ns-1): # run component by component samples[:, s+1], target_eval[s+1], acc[:, s+1] = self.single_update(samples[:, s], target_eval[s]) self._print_progress(s+2,Ns) #s+2 is the sample number, s+1 is index assuming x0 is the first sample self._call_callback(samples[:, s+1], s+1) # remove burn-in samples = samples[:, Nb:] target_eval = target_eval[Nb:] acccomp = acc[:, Nb:].mean(axis=1) print('\nAverage acceptance rate all components:', acccomp.mean(), '\n') return samples, target_eval, acccomp def _sample_adapt(self, N, Nb): # this follows the vanishing adaptation Algorithm 4 in: # Andrieu and Thoms (2008) - A tutorial on adaptive MCMC Ns = N+Nb # number of simulations # allocation samples = np.empty((self.dim, Ns)) target_eval = np.empty(Ns) acc = np.zeros((self.dim, Ns), dtype=int) # initial state samples[:, 0] = self.x0 target_eval[0] = self.target.logd(self.x0) acc[:, 0] = np.ones(self.dim) # initial adaptation params Na = int(0.1*N) # iterations to adapt hat_acc = np.empty((self.dim, int(np.floor(Ns/Na)))) # average acceptance rate of the chains lambd = np.empty((self.dim, int(np.floor(Ns/Na)+1))) # scaling parameter \in (0,1) lambd[:, 0] = self.scale star_acc = 0.21/self.dim + 0.23 # target acceptance rate RW i, idx = 0, 0 # run MCMC for s in range(Ns-1): # run component by component samples[:, s+1], target_eval[s+1], acc[:, s+1] = self.single_update(samples[:, s], target_eval[s]) # adapt prop spread of each component using acc of past samples if ((s+1) % Na == 0): # evaluate average acceptance rate hat_acc[:, i] = np.mean(acc[:, idx:idx+Na], axis=1) # compute new scaling parameter zeta = 1/np.sqrt(i+1) # ensures that the variation of lambda(i) vanishes lambd[:, i+1] = np.exp(np.log(lambd[:, i]) + zeta*(hat_acc[:, i]-star_acc)) # update parameters self.scale = np.minimum(lambd[:, i+1], np.ones(self.dim)) # update counters i += 1 idx += Na # display iterations self._print_progress(s+2,Ns) #s+2 is the sample number, s+1 is index assuming x0 is the first sample self._call_callback(samples[:, s+1], s+1) # remove burn-in samples = samples[:, Nb:] target_eval = target_eval[Nb:] acccomp = acc[:, Nb:].mean(axis=1) print('\nAverage acceptance rate all components:', acccomp.mean(), '\n') return samples, target_eval, acccomp
[docs] def single_update(self, x_t, target_eval_t): if isinstance(self.proposal,cuqi.distribution.Distribution): x_i_star = self.proposal(location= x_t, scale = self.scale).sample() else: x_i_star = self.proposal(x_t, self.scale) x_star = x_t.copy() acc = np.zeros(self.dim) for j in range(self.dim): # propose state x_star[j] = x_i_star[j] # evaluate target target_eval_star = self.target.logd(x_star) # ratio and acceptance probability ratio = target_eval_star - target_eval_t # proposal is symmetric alpha = min(0, ratio) # accept/reject u_theta = np.log(np.random.rand()) if (u_theta <= alpha): x_t[j] = x_i_star[j] target_eval_t = target_eval_star acc[j] = 1 else: pass # x_t[j] = x_t[j] # target_eval_t = target_eval_t x_star = x_t.copy() # return x_t, target_eval_t, acc