Source code for cuqi.distribution._joint_distribution

from __future__ import annotations
from typing import List
from copy import copy
from cuqi.density import Density, EvaluatedDensity
from cuqi.distribution import Distribution, Posterior
from cuqi.likelihood import Likelihood
from cuqi.geometry import Geometry, _DefaultGeometry1D
import cuqi
import numpy as np # for splitting array. Can avoid.

[docs] class JointDistribution: """ Joint distribution of multiple variables. Parameters ---------- densities : RandomVariable or Density The densities to include in the joint distribution. Each density is passed as comma-separated arguments, and can be either a :class:'Density' such as :class:'Distribution' or :class:`RandomVariable`. Notes ----- The joint distribution allows conditioning on any variable of the distribution. This is useful for example when Gibbs sampling and the conditionals are needed. Conditioning essentially fixes the variable and stores its contribution to the log density function. Example ------- Consider defining the joint distribution: .. math:: p(y,x,z) = p(y \mid x)p(x \mid z)p(z) and conditioning on :math:`y=y_{obs}` leading to the posterior: .. math:: p(x,z \mid y_{obs}) = p(y_{obs} \mid x)p(x \mid z)p(z) .. code-block:: python import cuqi import numpy as np y_obs = np.random.randn(10) A = np.random.randn(10,3) # Define distributions for Bayesian model y = cuqi.distribution.Normal(lambda x: A@x, np.ones(10)) x = cuqi.distribution.Normal(np.zeros(3), lambda z:z) z = cuqi.distribution.Gamma(1, 1) # Joint distribution p(y,x,z) joint = cuqi.distribution.JointDistribution(y, x, z) # Posterior p(x,z | y_obs) posterior = joint(y=y_obs) """
[docs] def __init__(self, *densities: [Density, cuqi.experimental.algebra.RandomVariable]): """ Create a joint distribution from the given densities. """ # Check if all RandomVariables are simple (not-transformed) for density in densities: if isinstance(density, cuqi.experimental.algebra.RandomVariable) and density.is_transformed: raise ValueError(f"To be used in {self.__class__.__name__}, all RandomVariables must be untransformed.") # Convert potential random variables to their underlying distribution densities = [density.distribution if isinstance(density, cuqi.experimental.algebra.RandomVariable) else density for density in densities] # Ensure all densities have unique names names = [density.name for density in densities] if len(names) != len(set(names)): raise ValueError("All densities must have unique names.") self._densities = list(densities) # Make sure every parameter has a distribution (prior) cond_vars = self._get_conditioning_variables() if len(cond_vars) > 0: raise ValueError(f"Every density parameter must have a distribution (prior). Missing prior for {cond_vars}.") # Initialize finite difference gradient approximation settings self.disable_FD()
# --------- Public properties --------- @property def dim(self) -> List[int]: """ Returns the dimensions of the joint distribution. """ return [dist.dim for dist in self._distributions] @property def geometry(self) -> List[Geometry]: """ Returns the geometries of the joint distribution. """ return [dist.geometry for dist in self._distributions] @property def FD_enabled(self): """ Returns a dictionary of keys and booleans indicating for each parameter name (key) if finite difference approximation of the logd gradient is enabled. """ par_names = self.get_parameter_names() FD_enabled = { par_name: self.FD_epsilon[par_name] is not None for par_name in par_names } return FD_enabled @property def FD_epsilon(self): """ Returns a dictionary indicating for each parameter name the spacing for the finite difference approximation of the logd gradient.""" return self._FD_epsilon @FD_epsilon.setter def FD_epsilon(self, value): """ Set the spacing for the finite difference approximation of the logd gradient as a dictionary. The keys are the parameter names. The value for each key is either None (no FD approximation) or a float representing the FD step size. """ par_names = self.get_parameter_names() if value is None: self._FD_epsilon = {par_name: None for par_name in par_names} else: if set(value.keys()) != set(par_names): raise ValueError("Keys of FD_epsilon must match the parameter names of the distribution "+f" {par_names}") self._FD_epsilon = value # --------- Public methods ---------
[docs] def logd(self, *args, **kwargs): """ Evaluate the un-normalized log density function. """ kwargs = self._parse_args_add_to_kwargs(*args, **kwargs) # Check that all parameters are passed by matching parameter names with kwargs keys if set(self.get_parameter_names()) != set(kwargs.keys()): raise ValueError(f"To evaluate the log density function, all parameters must be passed. Received {kwargs.keys()} and expected {self.get_parameter_names()}.") # Evaluate the log density function for each density logd = 0 for density in self._densities: logd_kwargs = {key:value for (key,value) in kwargs.items() if key in density.get_parameter_names()} logd += density.logd(**logd_kwargs) return logd
def __call__(self, *args, **kwargs) -> JointDistribution: """ Condition the joint distribution on the given variables. """ return self._condition(*args, **kwargs) def _condition(self, *args, **kwargs): # Public through __call__ kwargs = self._parse_args_add_to_kwargs(*args, **kwargs) # Create new shallow copy of joint density new_joint = copy(self) # Shallow copy of self new_joint._densities = self._densities[:] # Shallow copy of densities # Condition each of the new densities on kwargs relevant to that density for i, density in enumerate(new_joint._densities): cond_kwargs = {key:value for (key,value) in kwargs.items() if key in density.get_parameter_names()} new_joint._densities[i] = density(**cond_kwargs) # Potentially reduce joint distribution to a single density # This happens if there is only a single parameter left. # Can reduce to Posterior, Likelihood or Distribution. return new_joint._reduce_to_single_density()
[docs] def enable_FD(self, epsilon=None): """ Enable finite difference approximation for logd gradient. Note that if enabled, the FD approximation will be used even if the _gradient method is implemented. By default, all parameters will have FD enabled with a step size of 1e-8. Parameters ---------- epsilon : dict, *optional* Dictionary indicating the spacing (step size) to use for finite difference approximation for logd gradient for each variable. Keys are variable names. Values are either a float to enable FD with the given value as the FD step size, or None to disable FD for that variable. Default is 1e-8 for all variables. """ if epsilon is None: epsilon = {par_name: 1e-8 for par_name in self.get_parameter_names()} self.FD_epsilon = epsilon
[docs] def disable_FD(self): """ Disable finite difference approximation for logd gradient. """ par_names = self.get_parameter_names() self.FD_epsilon = {par_name: None for par_name in par_names}
[docs] def get_parameter_names(self) -> List[str]: """ Returns the parameter names of the joint distribution. """ return [dist.name for dist in self._distributions]
[docs] def get_density(self, name) -> Density: """ Return a density with the given name. """ for density in self._densities: if density.name == name: return density raise ValueError(f"No density with name {name}.")
# --------- Private properties --------- @property def _distributions(self) -> List[Distribution]: """ Returns a list of the distributions (priors) in the joint distribution. """ return [dist for dist in self._densities if isinstance(dist, Distribution)] @property def _likelihoods(self) -> List[Likelihood]: """ Returns a list of the likelihoods in the joint distribution. """ return [likelihood for likelihood in self._densities if isinstance(likelihood, Likelihood)] @property def _evaluated_densities(self) -> List[EvaluatedDensity]: """ Returns a list of the evaluated densities in the joint distribution. """ return [eval_dens for eval_dens in self._densities if isinstance(eval_dens, EvaluatedDensity)] # --------- Private methods --------- def _get_conditioning_variables(self) -> List[str]: """ Return the conditioning variables of the joint distribution. """ joint_par_names = self.get_parameter_names() cond_vars = set() for density in self._densities: cond_vars.update([par_name for par_name in density.get_parameter_names() if par_name not in joint_par_names]) return list(cond_vars) def _get_fixed_variables(self) -> List[str]: """ Return the variables that have been conditioned on (fixed). """ # Extract names of Likelihoods and EvaluatedDensities return [density.name for density in self._densities if isinstance(density, Likelihood) or isinstance(density, EvaluatedDensity)] def _parse_args_add_to_kwargs(self, *args, **kwargs): """ Parse args and add to kwargs. The args are assumed to follow the order of the parameter names. """ if len(args)>0: ordered_keys = self.get_parameter_names() for index, arg in enumerate(args): if ordered_keys[index] in kwargs: raise ValueError(f"{ordered_keys[index]} passed as both argument and keyword argument.\nArguments follow the listed parameter names order: {ordered_keys}") kwargs[ordered_keys[index]] = arg return kwargs def _sum_evaluated_densities(self): """ Return the sum of the evaluated densities in the joint distribution """ return sum([density.logd() for density in self._evaluated_densities]) def _reduce_to_single_density(self): """ Reduce the joint distribution to a single density if possible. The single density is either a Posterior, Likelihood or Distribution. This method is a hack to allow our current samplers to work with the joint distribution. It should be removed in the future. """ # Count number of distributions and likelihoods n_dist = len(self._distributions) n_likelihood = len(self._likelihoods) reduced_FD_epsilon = {par_name:self.FD_epsilon[par_name] for par_name in self.get_parameter_names()} self.enable_FD(epsilon=reduced_FD_epsilon) # Cant reduce if there are multiple distributions or likelihoods if n_dist > 1: return self # If only evaluated densities left return joint to ensure logd method is available if n_dist == 0 and n_likelihood == 0: return self # Extract the parameter name of the distribution if n_dist == 1: par_name = self._distributions[0].name elif n_likelihood == 1: par_name = self._likelihoods[0].name else: par_name = None # If exactly one distribution and multiple likelihoods reduce if n_dist == 1 and n_likelihood > 1: reduced_distribution = MultipleLikelihoodPosterior(*self._densities) reduced_FD_epsilon = {par_name:self.FD_epsilon[par_name]} # If exactly one distribution and one likelihood its a Posterior if n_dist == 1 and n_likelihood == 1: # Ensure parameter names match, otherwise return the joint distribution if set(self._likelihoods[0].get_parameter_names()) != set(self._distributions[0].get_parameter_names()): return self reduced_distribution = Posterior(self._likelihoods[0], self._distributions[0]) reduced_distribution = self._add_constants_to_density(reduced_distribution) reduced_FD_epsilon = self.FD_epsilon[par_name] # If exactly one distribution and no likelihoods its a Distribution if n_dist == 1 and n_likelihood == 0: # Intentionally skip enabling FD here. If the user wants FD, they # can enable it for this particular distribution before forming # the joint distribution. return self._add_constants_to_density(self._distributions[0]) # If no distributions and exactly one likelihood its a Likelihood if n_likelihood == 1 and n_dist == 0: # This case seems to not happen in practice, but we include it for # completeness. reduced_distribution = self._likelihoods[0] reduced_FD_epsilon = self.FD_epsilon[par_name] if self.FD_enabled[par_name]: reduced_distribution.enable_FD(epsilon=reduced_FD_epsilon) return reduced_distribution def _add_constants_to_density(self, density: Density): """ Add the constants (evaluated densities) to a single density. Used when reducing to single density. """ if isinstance(density, EvaluatedDensity): raise ValueError("Cannot add the sum of all evaluated densities to an EvaluatedDensity.") density._constant += self._sum_evaluated_densities() return density def _as_stacked(self) -> _StackedJointDistribution: """ Return a stacked JointDistribution with the same densities. """ return _StackedJointDistribution(*self._densities) def __repr__(self): msg = f"JointDistribution(\n" msg += " Equation: \n\t" # Construct equation expression joint_par_names = ",".join(self.get_parameter_names()) fixed_par_names = ",".join(self._get_fixed_variables()) if len(joint_par_names) == 0: msg += "Constant number" else: # LHS of equation: p(x,y,z) = or p(x,y|z) ∝ msg += f"p({joint_par_names}" if len(fixed_par_names) > 0: msg += f"|{fixed_par_names}) ∝ " else: msg += ") = " # RHS of equation: product of densities for density in self._densities: par_names = ",".join([density.name]) cond_vars = ",".join(set(density.get_parameter_names())-set([density.name])) # Distributions are written as p(x|y) or p(x). Likelihoods are L(y|x). # x=par_names, y=cond_vars. if isinstance(density, Likelihood): msg += f"L({cond_vars}|{par_names})" elif isinstance(density, Distribution): msg += f"p({par_names}" if len(cond_vars) > 0: msg += f"|{cond_vars}" msg += ")" msg += "\n" msg += " Densities: \n" # Create "Bayesian model" equations for density in self._densities: msg += f"\t{density.name} ~ {density}\n" # Wrap up msg += " )" return msg
class _StackedJointDistribution(JointDistribution, Distribution): """ A joint distribution where all parameters are stacked into a single vector. This acts like a regular Distribution with a single parameter vector even though it is actually a joint distribution. This is intended to be used by samplers that are not able to handle joint distributions. A joint distribution can be converted to a stacked joint distribution by calling the :meth:`_as_stacked` method. See :class:`JointDistribution` for more details on the joint distribution. """ @property def dim(self): return sum(super().dim) @property def geometry(self): return _DefaultGeometry1D(self.dim) def logd(self, stacked_input): """ Return the un-normalized log density function stacked joint density. """ # Split the stacked input into individual inputs and call superclass split_indices = np.cumsum(super().dim) # list(accumulate(super().dim)) inputs = np.split(stacked_input, split_indices[:-1]) names = self.get_parameter_names() # Create keyword arguments kwargs = dict(zip(names, inputs)) return super().logd(**kwargs) def logpdf(self, stacked_input): return self.logd(stacked_input) def _sample(self, Ns=1): raise TypeError(f"{self.__class__.__name__} does not support sampling.") def __repr__(self): return "_Stacked"+super().__repr__()
[docs] class MultipleLikelihoodPosterior(JointDistribution, Distribution): """ A posterior distribution with multiple likelihoods and a single prior. Parameters ---------- densities : :class:`Distribution` or :class:`~cuqi.likelihood.Likelihood` The densities that make up the Posterior. Must include at least three densities. For a simple Likelihood and prior use :class:`Posterior` instead. Notes ----- This acts like a regular distribution with a single parameter vector. Behind-the-scenes it is a joint posterior distribution with multiple likelihoods and a single prior. This is mostly intended to be used by samplers that are not able to handle joint distributions. See :class:`JointDistribution` for more details on the joint distribution. """
[docs] def __init__(self, *densities: Density): super().__init__(*densities) self._check_densities_have_same_parameter()
@property def geometry(self): """ The geometry of the distribution. """ return self.prior.geometry @property def dim(self): """ Return the dimension of the distribution. """ return self.prior.dim @property def prior(self): """ Return the prior distribution of the posterior. """ return self._distributions[0] @property def likelihoods(self): """ Return the likelihoods of the posterior. """ return self._likelihoods @property def models(self): """ Return the forward models that make up the posterior. """ return [likelihood.model for likelihood in self.likelihoods]
[docs] def logpdf(self, *args, **kwargs): return self.logd(*args, **kwargs)
[docs] def gradient(self, *args, **kwargs): """ Return the gradient of the un-normalized log density function. """ return sum(density.gradient(*args, **kwargs) for density in self._densities)
def _sample(self, Ns=1): raise TypeError(f"{self.__class__.__name__} does not support direct sampling.") def _check_densities_have_same_parameter(self): """ Checks the densities if they are for one parameter only and that there are at least 3 densities. """ if len(self._densities) < 3: raise ValueError(f"{self.__class__.__name__} requires at least three densities. For a single likelihood and prior use Posterior instead.") if len(self.likelihoods) == 0: raise ValueError(f"{self.__class__.__name__} must have a likelihood and prior.") # Check that there is only a single parameter par_names = self.get_parameter_names() if len(set(par_names)) > 1: raise ValueError(f"{self.__class__.__name__} requires all densities to have the same parameter name.") def __repr__(self): # Remove first line of super repr and add class name to the start return f"{self.__class__.__name__}(\n" + "\n".join(super().__repr__().split("\n")[1:])