# Copyright Contributors to the Pyro project.
# SPDX-License-Identifier: Apache-2.0
# The implementation follows the design in PyTorch: torch.distributions.distribution.py
#
# Copyright (c) 2016- Facebook, Inc (Adam Paszke)
# Copyright (c) 2014- Facebook, Inc (Soumith Chintala)
# Copyright (c) 2011-2014 Idiap Research Institute (Ronan Collobert)
# Copyright (c) 2012-2014 Deepmind Technologies (Koray Kavukcuoglu)
# Copyright (c) 2011-2012 NEC Laboratories America (Koray Kavukcuoglu)
# Copyright (c) 2011-2013 NYU (Clement Farabet)
# Copyright (c) 2006-2010 NEC Laboratories America (Ronan Collobert, Leon Bottou, Iain Melvin, Jason Weston)
# Copyright (c) 2006 Idiap Research Institute (Samy Bengio)
# Copyright (c) 2001-2004 Idiap Research Institute (Ronan Collobert, Samy Bengio, Johnny Mariethoz)
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from collections import OrderedDict
from contextlib import contextmanager
import functools
import inspect
import warnings
import numpy as np
import jax
from jax import lax, tree_util
import jax.numpy as jnp
from jax.scipy.special import logsumexp
from numpyro.distributions.transforms import AbsTransform, ComposeTransform, Transform
from numpyro.distributions.util import (
lazy_property,
promote_shapes,
sum_rightmost,
validate_sample,
)
from numpyro.util import find_stack_level, not_jax_tracer
from . import constraints
_VALIDATION_ENABLED = False
[docs]
def enable_validation(is_validate=True):
"""
Enable or disable validation checks in NumPyro. Validation checks provide useful warnings and
errors, e.g. NaN checks, validating distribution arguments and support values, etc. which is
useful for debugging.
.. note:: This utility does not take effect under JAX's JIT compilation or vectorized
transformation :func:`jax.vmap`.
:param bool is_validate: whether to enable validation checks.
"""
global _VALIDATION_ENABLED
_VALIDATION_ENABLED = is_validate
Distribution.set_default_validate_args(is_validate)
[docs]
@contextmanager
def validation_enabled(is_validate=True):
"""
Context manager that is useful when temporarily enabling/disabling validation checks.
:param bool is_validate: whether to enable validation checks.
"""
distribution_validation_status = _VALIDATION_ENABLED
try:
enable_validation(is_validate)
yield
finally:
enable_validation(distribution_validation_status)
COERCIONS = []
class DistributionMeta(type):
def __init__(cls, *args, **kwargs):
signature = inspect.signature(functools.partial(cls.__init__, None))
cls.__signature__ = signature
return super().__init__(*args, **kwargs)
def __call__(cls, *args, **kwargs):
for coerce_ in COERCIONS:
result = coerce_(cls, args, kwargs)
if result is not None:
return result
return super().__call__(*args, **kwargs)
[docs]
class Distribution(metaclass=DistributionMeta):
"""
Base class for probability distributions in NumPyro. The design largely
follows from :mod:`torch.distributions`.
:param batch_shape: The batch shape for the distribution. This designates
independent (possibly non-identical) dimensions of a sample from the
distribution. This is fixed for a distribution instance and is inferred
from the shape of the distribution parameters.
:param event_shape: The event shape for the distribution. This designates
the dependent dimensions of a sample from the distribution. These are
collapsed when we evaluate the log probability density of a batch of
samples using `.log_prob`.
:param validate_args: Whether to enable validation of distribution
parameters and arguments to `.log_prob` method.
As an example:
.. doctest::
>>> import jax.numpy as jnp
>>> import numpyro.distributions as dist
>>> d = dist.Dirichlet(jnp.ones((2, 3, 4)))
>>> d.batch_shape
(2, 3)
>>> d.event_shape
(4,)
"""
arg_constraints = {}
support = None
has_enumerate_support = False
reparametrized_params = []
_validate_args = False
pytree_data_fields = ()
pytree_aux_fields = ("_batch_shape", "_event_shape")
# register Distribution as a pytree
# ref: https://github.com/google/jax/issues/2916
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
tree_util.register_pytree_node(cls, cls.tree_flatten, cls.tree_unflatten)
[docs]
@classmethod
def gather_pytree_data_fields(cls):
bases = inspect.getmro(cls)
all_pytree_data_fields = ()
for base in bases:
if issubclass(base, Distribution):
all_pytree_data_fields += base.__dict__.get(
"pytree_data_fields",
tuple(base.__dict__.get("arg_constraints", {}).keys()),
)
# remove duplicates
all_pytree_data_fields = tuple(set(all_pytree_data_fields))
return all_pytree_data_fields
[docs]
@classmethod
def gather_pytree_aux_fields(cls) -> tuple:
bases = inspect.getmro(cls)
all_pytree_aux_fields = ("_validate_args",)
for base in bases:
if issubclass(base, Distribution):
all_pytree_aux_fields += base.__dict__.get("pytree_aux_fields", ())
# remove duplicates
all_pytree_aux_fields = tuple(set(all_pytree_aux_fields))
return all_pytree_aux_fields
[docs]
def tree_flatten(self):
all_pytree_data_fields_names = type(self).gather_pytree_data_fields()
all_pytree_data_fields_vals = tuple(
# getattr(self, attr_name) for attr_name in all_pytree_data_fields_names
self.__dict__.get(attr_name)
for attr_name in all_pytree_data_fields_names
)
all_pytree_aux_fields_names = type(self).gather_pytree_aux_fields()
all_pytree_aux_fields_vals = tuple(
# getattr(self, attr_name) for attr_name in all_pytree_aux_fields_names
self.__dict__.get(attr_name)
for attr_name in all_pytree_aux_fields_names
)
return (
all_pytree_data_fields_vals,
all_pytree_aux_fields_vals,
)
[docs]
@classmethod
def tree_unflatten(cls, aux_data, params):
pytree_data_fields = cls.gather_pytree_data_fields()
pytree_aux_fields = cls.gather_pytree_aux_fields()
pytree_data_fields_dict = dict(zip(pytree_data_fields, params))
pytree_aux_fields_dict = dict(zip(pytree_aux_fields, aux_data))
d = cls.__new__(cls)
for k, v in pytree_data_fields_dict.items():
if v is not None or not isinstance(getattr(cls, k, None), lazy_property):
setattr(d, k, v)
for k, v in pytree_aux_fields_dict.items():
if v is not None or not isinstance(getattr(cls, k, None), lazy_property):
setattr(d, k, v)
# disable args validation during `tree_unflatten` it is called by jax with
# placeholder attributes that would make validation fail
d._validate_args = False
Distribution.__init__(
d,
pytree_aux_fields_dict["_batch_shape"],
pytree_aux_fields_dict["_event_shape"],
)
d._validate_args = pytree_aux_fields_dict["_validate_args"]
return d
[docs]
@staticmethod
def set_default_validate_args(value):
if value not in [True, False]:
raise ValueError
Distribution._validate_args = value
def __init__(self, batch_shape=(), event_shape=(), *, validate_args=None):
self._batch_shape = batch_shape
self._event_shape = event_shape
if validate_args is not None:
self._validate_args = validate_args
if self._validate_args:
self.validate_args(strict=False)
super(Distribution, self).__init__()
[docs]
def get_args(self) -> dict:
"""
Get arguments of the distribution.
"""
return {
param: getattr(self, param)
for param in self.arg_constraints
if param in self.__dict__
or not isinstance(getattr(type(self), param), lazy_property)
}
[docs]
def validate_args(self, strict: bool = True) -> None:
"""
Validate the arguments of the distribution.
:param strict: Require strict validation, raising an error if the function is
called inside jitted code.
"""
for param, value in self.get_args().items():
constraint = self.arg_constraints[param]
if constraints.is_dependent(constraint):
continue # skip constraints that cannot be checked
is_valid = constraint(value)
if not_jax_tracer(is_valid):
if not np.all(is_valid):
raise ValueError(
"{} distribution got invalid {} parameter.".format(
self.__class__.__name__, param
)
)
elif strict:
raise RuntimeError("Cannot validate arguments inside jitted code.")
@property
def batch_shape(self):
"""
Returns the shape over which the distribution parameters are batched.
:return: batch shape of the distribution.
:rtype: tuple
"""
return self._batch_shape
@property
def event_shape(self):
"""
Returns the shape of a single sample from the distribution without
batching.
:return: event shape of the distribution.
:rtype: tuple
"""
return self._event_shape
@property
def event_dim(self):
"""
:return: Number of dimensions of individual events.
:rtype: int
"""
return len(self.event_shape)
@property
def has_rsample(self):
return set(self.reparametrized_params) == set(self.arg_constraints)
[docs]
def rsample(self, key, sample_shape=()):
if self.has_rsample:
return self.sample(key, sample_shape=sample_shape)
raise NotImplementedError
[docs]
def shape(self, sample_shape=()):
"""
The tensor shape of samples from this distribution.
Samples are of shape::
d.shape(sample_shape) == sample_shape + d.batch_shape + d.event_shape
:param tuple sample_shape: the size of the iid batch to be drawn from the
distribution.
:return: shape of samples.
:rtype: tuple
"""
return sample_shape + self.batch_shape + self.event_shape
[docs]
def sample(self, key, sample_shape=()):
"""
Returns a sample from the distribution having shape given by
`sample_shape + batch_shape + event_shape`. Note that when `sample_shape` is non-empty,
leading dimensions (of size `sample_shape`) of the returned sample will
be filled with iid draws from the distribution instance.
:param jax.random.PRNGKey key: the rng_key key to be used for the distribution.
:param tuple sample_shape: the sample shape for the distribution.
:return: an array of shape `sample_shape + batch_shape + event_shape`
:rtype: numpy.ndarray
"""
raise NotImplementedError
[docs]
def log_prob(self, value):
"""
Evaluates the log probability density for a batch of samples given by
`value`.
:param value: A batch of samples from the distribution.
:return: an array with shape `value.shape[:-self.event_shape]`
:rtype: numpy.ndarray
"""
raise NotImplementedError
@property
def mean(self):
"""
Mean of the distribution.
"""
raise NotImplementedError
@property
def variance(self):
"""
Variance of the distribution.
"""
raise NotImplementedError
def _validate_sample(self, value):
mask = self.support(value)
if not_jax_tracer(mask):
if not np.all(mask):
warnings.warn(
"Out-of-support values provided to log prob method. "
"The value argument should be within the support.",
stacklevel=find_stack_level(),
)
return mask
def __call__(self, *args, **kwargs):
key = kwargs.pop("rng_key")
sample_intermediates = kwargs.pop("sample_intermediates", False)
if sample_intermediates:
return self.sample_with_intermediates(key, *args, **kwargs)
return self.sample(key, *args, **kwargs)
[docs]
def to_event(self, reinterpreted_batch_ndims=None):
"""
Interpret the rightmost `reinterpreted_batch_ndims` batch dimensions as
dependent event dimensions.
:param reinterpreted_batch_ndims: Number of rightmost batch dims to
interpret as event dims.
:return: An instance of `Independent` distribution.
:rtype: numpyro.distributions.distribution.Independent
"""
if reinterpreted_batch_ndims is None:
reinterpreted_batch_ndims = len(self.batch_shape)
if reinterpreted_batch_ndims == 0:
return self
return Independent(self, reinterpreted_batch_ndims)
[docs]
def enumerate_support(self, expand=True):
"""
Returns an array with shape `len(support) x batch_shape`
containing all values in the support.
"""
raise NotImplementedError
[docs]
def entropy(self):
"""
Returns the entropy of the distribution.
"""
raise NotImplementedError
[docs]
def expand(self, batch_shape):
"""
Returns a new :class:`ExpandedDistribution` instance with batch
dimensions expanded to `batch_shape`.
:param tuple batch_shape: batch shape to expand to.
:return: an instance of `ExpandedDistribution`.
:rtype: :class:`ExpandedDistribution`
"""
batch_shape = tuple(batch_shape)
if batch_shape == self.batch_shape:
return self
return ExpandedDistribution(self, batch_shape)
[docs]
def expand_by(self, sample_shape):
"""
Expands a distribution by adding ``sample_shape`` to the left side of
its :attr:`~numpyro.distributions.distribution.Distribution.batch_shape`.
To expand internal dims of ``self.batch_shape`` from 1 to something
larger, use :meth:`expand` instead.
:param tuple sample_shape: The size of the iid batch to be drawn
from the distribution.
:return: An expanded version of this distribution.
:rtype: :class:`ExpandedDistribution`
"""
return self.expand(tuple(sample_shape) + self.batch_shape)
[docs]
def mask(self, mask):
"""
Masks a distribution by a boolean or boolean-valued array that is
broadcastable to the distributions
:attr:`Distribution.batch_shape` .
:param mask: A boolean or boolean valued array (`True` includes
a site, `False` excludes a site).
:type mask: bool or jnp.ndarray
:return: A masked copy of this distribution.
:rtype: :class:`MaskedDistribution`
**Example:**
.. doctest::
>>> from jax import random
>>> import jax.numpy as jnp
>>> import numpyro
>>> import numpyro.distributions as dist
>>> from numpyro.distributions import constraints
>>> from numpyro.infer import SVI, Trace_ELBO
>>> def model(data, m):
... f = numpyro.sample("latent_fairness", dist.Beta(1, 1))
... with numpyro.plate("N", data.shape[0]):
... # only take into account the values selected by the mask
... masked_dist = dist.Bernoulli(f).mask(m)
... numpyro.sample("obs", masked_dist, obs=data)
>>> def guide(data, m):
... alpha_q = numpyro.param("alpha_q", 5., constraint=constraints.positive)
... beta_q = numpyro.param("beta_q", 5., constraint=constraints.positive)
... numpyro.sample("latent_fairness", dist.Beta(alpha_q, beta_q))
>>> data = jnp.concatenate([jnp.ones(5), jnp.zeros(5)])
>>> # select values equal to one
>>> masked_array = jnp.where(data == 1, True, False)
>>> optimizer = numpyro.optim.Adam(step_size=0.05)
>>> svi = SVI(model, guide, optimizer, loss=Trace_ELBO())
>>> svi_result = svi.run(random.PRNGKey(0), 300, data, masked_array)
>>> params = svi_result.params
>>> # inferred_mean is closer to 1
>>> inferred_mean = params["alpha_q"] / (params["alpha_q"] + params["beta_q"])
"""
if mask is True:
return self
return MaskedDistribution(self, mask)
[docs]
@classmethod
def infer_shapes(cls, *args, **kwargs):
r"""
Infers ``batch_shape`` and ``event_shape`` given shapes of args to
:meth:`__init__`.
.. note:: This assumes distribution shape depends only on the shapes
of tensor inputs, not in the data contained in those inputs.
:param \*args: Positional args replacing each input arg with a
tuple representing the sizes of each tensor input.
:param \*\*kwargs: Keywords mapping name of input arg to tuple
representing the sizes of each tensor input.
:returns: A pair ``(batch_shape, event_shape)`` of the shapes of a
distribution that would be created with input args of the given
shapes.
:rtype: tuple
"""
if cls.support.event_dim > 0:
raise NotImplementedError
# Convert args to kwargs.
try:
arg_names = cls._arg_names
except AttributeError:
sig = inspect.signature(cls.__init__)
arg_names = cls._arg_names = tuple(sig.parameters)[1:]
kwargs.update(zip(arg_names, args))
# Assumes distribution is univariate.
batch_shapes = []
for name, shape in kwargs.items():
if shape is not None:
event_dim = cls.arg_constraints.get(name, constraints.real).event_dim
batch_shapes.append(shape[: len(shape) - event_dim])
batch_shape = lax.broadcast_shapes(*batch_shapes) if batch_shapes else ()
event_shape = ()
return batch_shape, event_shape
[docs]
def cdf(self, value):
"""
The cumulative distribution function of this distribution.
:param value: samples from this distribution.
:return: output of the cumulative distribution function evaluated at `value`.
"""
raise NotImplementedError
[docs]
def icdf(self, q):
"""
The inverse cumulative distribution function of this distribution.
:param q: quantile values, should belong to [0, 1].
:return: the samples whose cdf values equals to `q`.
"""
raise NotImplementedError
@property
def is_discrete(self):
return self.support.is_discrete
[docs]
class ExpandedDistribution(Distribution):
arg_constraints = {}
pytree_data_fields = ("base_dist",)
pytree_aux_fields = (
"_expanded_sizes",
"_interstitial_sizes",
)
def __init__(self, base_dist, batch_shape=()):
if isinstance(base_dist, ExpandedDistribution):
batch_shape, _, _ = self._broadcast_shape(
base_dist.batch_shape, batch_shape
)
base_dist = base_dist.base_dist
self.base_dist = base_dist
# adjust batch shape
# Do basic validation. e.g. we should not "unexpand" distributions even if that is possible.
new_shape, _, _ = self._broadcast_shape(base_dist.batch_shape, batch_shape)
# Record interstitial and expanded dims/sizes w.r.t. the base distribution
new_shape, expanded_sizes, interstitial_sizes = self._broadcast_shape(
base_dist.batch_shape, new_shape
)
self._expanded_sizes = expanded_sizes
self._interstitial_sizes = interstitial_sizes
super().__init__(new_shape, base_dist.event_shape)
@staticmethod
def _broadcast_shape(existing_shape, new_shape):
if len(new_shape) < len(existing_shape):
raise ValueError(
"Cannot broadcast distribution of shape {} to shape {}".format(
existing_shape, new_shape
)
)
reversed_shape = list(reversed(existing_shape))
expanded_sizes, interstitial_sizes = [], []
for i, size in enumerate(reversed(new_shape)):
if i >= len(reversed_shape):
reversed_shape.append(size)
expanded_sizes.append((-i - 1, size))
elif reversed_shape[i] == 1:
if size != 1:
reversed_shape[i] = size
interstitial_sizes.append((-i - 1, size))
elif reversed_shape[i] != size:
raise ValueError(
"Cannot broadcast distribution of shape {} to shape {}".format(
existing_shape, new_shape
)
)
return (
tuple(reversed(reversed_shape)),
OrderedDict(reversed(expanded_sizes)),
OrderedDict(interstitial_sizes),
)
@property
def has_enumerate_support(self):
return self.base_dist.has_enumerate_support
@property
def has_rsample(self):
return self.base_dist.has_rsample
def _sample(self, sample_fn, key, sample_shape=()):
interstitial_sizes = tuple(self._interstitial_sizes.values())
expanded_sizes = tuple(self._expanded_sizes.values())
batch_shape = expanded_sizes + interstitial_sizes
# shape = sample_shape + expanded_sizes + interstitial_sizes + base_dist.shape()
samples, intermediates = sample_fn(key, sample_shape=sample_shape + batch_shape)
if not interstitial_sizes:
return samples, intermediates
interstitial_dims = tuple(self._interstitial_sizes.keys())
event_dim = len(self.event_shape)
batch_ndims = jnp.ndim(samples) - event_dim
interstitial_dims = tuple(batch_ndims + i for i in interstitial_dims)
interstitial_idx = len(sample_shape) + len(expanded_sizes)
interstitial_sample_dims = range(
interstitial_idx, interstitial_idx + len(interstitial_dims)
)
permutation = list(range(batch_ndims))
for dim1, dim2 in zip(interstitial_dims, interstitial_sample_dims):
permutation[dim1], permutation[dim2] = permutation[dim2], permutation[dim1]
def reshape_sample(x):
"""
Reshapes samples and intermediates to ensure that the output
shape is correct: This implicitly replaces the interstitial dims
of size 1 in the original batch_shape of base_dist with those
in the expanded dims.
"""
x = jnp.transpose(x, permutation + list(range(batch_ndims, jnp.ndim(x))))
event_shape = jnp.shape(x)[batch_ndims:]
return x.reshape(sample_shape + self.batch_shape + event_shape)
intermediates = jax.tree.map(reshape_sample, intermediates)
samples = reshape_sample(samples)
return samples, intermediates
[docs]
def rsample(self, key, sample_shape=()):
return self._sample(
lambda *args, **kwargs: (self.base_dist.rsample(*args, **kwargs), []),
key,
sample_shape,
)[0]
@property
def support(self):
return self.base_dist.support
[docs]
def sample(self, key, sample_shape=()):
return self.sample_with_intermediates(key, sample_shape)[0]
[docs]
def log_prob(self, value, intermediates=None):
# TODO: utilize `intermediates`
shape = lax.broadcast_shapes(
self.batch_shape,
jnp.shape(value)[: max(jnp.ndim(value) - self.event_dim, 0)],
)
log_prob = self.base_dist.log_prob(value)
return jnp.broadcast_to(log_prob, shape)
[docs]
def enumerate_support(self, expand=True):
samples = self.base_dist.enumerate_support(expand=False)
enum_shape = samples.shape[:1]
samples = samples.reshape(enum_shape + (1,) * len(self.batch_shape))
if expand:
samples = samples.expand(enum_shape + self.batch_shape)
return samples
@property
def mean(self):
return jnp.broadcast_to(
self.base_dist.mean, self.batch_shape + self.event_shape
)
@property
def variance(self):
return jnp.broadcast_to(
self.base_dist.variance, self.batch_shape + self.event_shape
)
[docs]
def entropy(self):
return jnp.broadcast_to(self.base_dist.entropy(), self.batch_shape)
[docs]
class Independent(Distribution):
"""
Reinterprets batch dimensions of a distribution as event dims by shifting
the batch-event dim boundary further to the left.
From a practical standpoint, this is useful when changing the result of
:meth:`log_prob`. For example, a univariate Normal distribution can be
interpreted as a multivariate Normal with diagonal covariance:
.. doctest::
>>> import numpyro.distributions as dist
>>> normal = dist.Normal(jnp.zeros(3), jnp.ones(3))
>>> [normal.batch_shape, normal.event_shape]
[(3,), ()]
>>> diag_normal = dist.Independent(normal, 1)
>>> [diag_normal.batch_shape, diag_normal.event_shape]
[(), (3,)]
:param numpyro.distribution.Distribution base_distribution: a distribution instance.
:param int reinterpreted_batch_ndims: the number of batch dims to reinterpret as event dims.
"""
arg_constraints = {}
pytree_data_fields = ("base_dist",)
pytree_aux_fields = ("reinterpreted_batch_ndims",)
def __init__(self, base_dist, reinterpreted_batch_ndims, *, validate_args=None):
if reinterpreted_batch_ndims > len(base_dist.batch_shape):
raise ValueError(
"Expected reinterpreted_batch_ndims <= len(base_distribution.batch_shape), "
"actual {} vs {}".format(
reinterpreted_batch_ndims, len(base_dist.batch_shape)
)
)
shape = base_dist.batch_shape + base_dist.event_shape
event_dim = reinterpreted_batch_ndims + len(base_dist.event_shape)
batch_shape = shape[: len(shape) - event_dim]
event_shape = shape[len(shape) - event_dim :]
self.base_dist = base_dist
self.reinterpreted_batch_ndims = reinterpreted_batch_ndims
super(Independent, self).__init__(
batch_shape, event_shape, validate_args=validate_args
)
@property
def support(self):
return constraints.independent(
self.base_dist.support, self.reinterpreted_batch_ndims
)
@property
def has_enumerate_support(self):
return self.base_dist.has_enumerate_support
@property
def reparametrized_params(self):
return self.base_dist.reparametrized_params
@property
def mean(self):
return self.base_dist.mean
@property
def variance(self):
return self.base_dist.variance
@property
def has_rsample(self):
return self.base_dist.has_rsample
[docs]
def rsample(self, key, sample_shape=()):
return self.base_dist.rsample(key, sample_shape=sample_shape)
[docs]
def sample(self, key, sample_shape=()):
return self.base_dist(rng_key=key, sample_shape=sample_shape)
[docs]
def log_prob(self, value):
log_prob = self.base_dist.log_prob(value)
return sum_rightmost(log_prob, self.reinterpreted_batch_ndims)
[docs]
def expand(self, batch_shape):
base_batch_shape = (
batch_shape + self.event_shape[: self.reinterpreted_batch_ndims]
)
return self.base_dist.expand(base_batch_shape).to_event(
self.reinterpreted_batch_ndims
)
[docs]
def entropy(self):
axes = range(-self.reinterpreted_batch_ndims, 0)
return self.base_dist.entropy().sum(axes)
[docs]
class MaskedDistribution(Distribution):
"""
Masks a distribution by a boolean array that is broadcastable to the
distribution's :attr:`Distribution.batch_shape`.
In the special case ``mask is False``, computation of :meth:`log_prob` , is skipped,
and constant zero values are returned instead.
:param mask: A boolean or boolean-valued array.
:type mask: jnp.ndarray or bool
"""
arg_constraints = {}
pytree_data_fields = ("base_dist", "_mask")
pytree_aux_fields = ("_mask",)
def __init__(self, base_dist, mask):
if isinstance(mask, bool):
self._mask = mask
else:
batch_shape = lax.broadcast_shapes(
jnp.shape(mask), tuple(base_dist.batch_shape)
)
if mask.shape != batch_shape:
mask = jnp.broadcast_to(mask, batch_shape)
if base_dist.batch_shape != batch_shape:
base_dist = base_dist.expand(batch_shape)
self._mask = mask.astype("bool")
self.base_dist = base_dist
super().__init__(base_dist.batch_shape, base_dist.event_shape)
@property
def has_enumerate_support(self):
return self.base_dist.has_enumerate_support
@property
def has_rsample(self):
return self.base_dist.has_rsample
[docs]
def rsample(self, key, sample_shape=()):
return self.base_dist.rsample(key, sample_shape=sample_shape)
@property
def support(self):
return self.base_dist.support
[docs]
def sample(self, key, sample_shape=()):
return self.base_dist(rng_key=key, sample_shape=sample_shape)
[docs]
def log_prob(self, value):
if self._mask is False:
shape = lax.broadcast_shapes(
tuple(self.base_dist.batch_shape),
jnp.shape(value)[: max(jnp.ndim(value) - len(self.event_shape), 0)],
)
return jnp.zeros(shape)
if self._mask is True:
return self.base_dist.log_prob(value)
try:
default_value = self.base_dist.support.feasible_like(value)
except NotImplementedError:
pass
else:
mask = jnp.reshape(
self._mask, jnp.shape(self._mask) + (1,) * self.event_dim
)
value = jnp.where(mask, value, default_value)
return jnp.where(self._mask, self.base_dist.log_prob(value), 0.0)
[docs]
def enumerate_support(self, expand=True):
return self.base_dist.enumerate_support(expand=expand)
@property
def mean(self):
return self.base_dist.mean
@property
def variance(self):
return self.base_dist.variance
[docs]
def tree_flatten(self):
data, aux = super().tree_flatten()
_mask_data_idx = type(self).gather_pytree_data_fields().index("_mask")
_mask_aux_idx = type(self).gather_pytree_aux_fields().index("_mask")
if isinstance(self._mask, bool):
data = list(data)
data[_mask_data_idx] = None
data = tuple(data)
else:
aux = list(aux)
aux[_mask_aux_idx] = None
aux = tuple(aux)
return data, aux
[docs]
@classmethod
def tree_unflatten(cls, aux_data, params):
d = super().tree_unflatten(aux_data, params)
_mask_data_idx = cls.gather_pytree_data_fields().index("_mask")
_mask_aux_idx = cls.gather_pytree_aux_fields().index("_mask")
if aux_data[_mask_aux_idx] is None:
setattr(d, "_mask", params[_mask_data_idx])
else:
setattr(d, "_mask", aux_data[_mask_aux_idx])
return d
[docs]
class FoldedDistribution(TransformedDistribution):
"""
Equivalent to ``TransformedDistribution(base_dist, AbsTransform())``,
but additionally supports :meth:`log_prob` .
:param Distribution base_dist: A univariate distribution to reflect.
"""
support = constraints.positive
def __init__(self, base_dist, *, validate_args=None):
if base_dist.event_shape:
raise ValueError("Only univariate distributions can be folded.")
super().__init__(base_dist, AbsTransform(), validate_args=validate_args)
@validate_sample
def log_prob(self, value):
dim = max(len(self.batch_shape), jnp.ndim(value))
plus_minus = jnp.array([1.0, -1.0]).reshape((2,) + (1,) * dim)
return logsumexp(self.base_dist.log_prob(plus_minus * value), axis=0)
[docs]
class Delta(Distribution):
arg_constraints = {
"v": constraints.dependent(is_discrete=False),
"log_density": constraints.real,
}
reparametrized_params = ["v", "log_density"]
def __init__(self, v=0.0, log_density=0.0, event_dim=0, *, validate_args=None):
if event_dim > jnp.ndim(v):
raise ValueError(
"Expected event_dim <= v.dim(), actual {} vs {}".format(
event_dim, jnp.ndim(v)
)
)
batch_dim = jnp.ndim(v) - event_dim
batch_shape = jnp.shape(v)[:batch_dim]
event_shape = jnp.shape(v)[batch_dim:]
self.v = v
# NB: following Pyro implementation, log_density should be broadcasted to batch_shape
self.log_density = promote_shapes(log_density, shape=batch_shape)[0]
super(Delta, self).__init__(
batch_shape, event_shape, validate_args=validate_args
)
@constraints.dependent_property
def support(self):
return constraints.independent(constraints.real, self.event_dim)
[docs]
def sample(self, key, sample_shape=()):
if not sample_shape:
return self.v
shape = sample_shape + self.batch_shape + self.event_shape
return jnp.broadcast_to(self.v, shape)
@validate_sample
def log_prob(self, value):
log_prob = jnp.log(value == self.v)
log_prob = sum_rightmost(log_prob, len(self.event_shape))
return log_prob + self.log_density
@property
def mean(self):
return self.v
@property
def variance(self):
return jnp.zeros(self.batch_shape + self.event_shape)
[docs]
def entropy(self):
return -jnp.broadcast_to(self.log_density, self.batch_shape)
[docs]
class Unit(Distribution):
"""
Trivial nonnormalized distribution representing the unit type.
The unit type has a single value with no data, i.e. ``value.size == 0``.
This is used for :func:`numpyro.factor` statements.
"""
arg_constraints = {"log_factor": constraints.real}
support = constraints.real
def __init__(self, log_factor, *, validate_args=None):
batch_shape = jnp.shape(log_factor)
event_shape = (0,) # This satisfies .size == 0.
self.log_factor = log_factor
super(Unit, self).__init__(
batch_shape, event_shape, validate_args=validate_args
)
[docs]
def sample(self, key, sample_shape=()):
return jnp.empty(sample_shape + self.batch_shape + self.event_shape)
[docs]
def log_prob(self, value):
shape = lax.broadcast_shapes(self.batch_shape, jnp.shape(value)[:-1])
return jnp.broadcast_to(self.log_factor, shape)