Source code for numpyro.infer.elbo

# Copyright Contributors to the Pyro project.
# SPDX-License-Identifier: Apache-2.0

from collections import OrderedDict, defaultdict
from functools import partial
from operator import itemgetter
import warnings

import jax
from jax import eval_shape, random, vmap
from jax.lax import stop_gradient
import jax.numpy as jnp
from jax.scipy.special import logsumexp

from numpyro.distributions import ExpandedDistribution, MaskedDistribution
from numpyro.distributions.kl import kl_divergence
from numpyro.distributions.util import scale_and_mask
from numpyro.handlers import replay, seed, substitute, trace
from numpyro.infer.util import (
    _without_rsample_stop_gradient,
    get_importance_trace,
    is_identically_one,
    log_density,
)
from numpyro.ops.provenance import eval_provenance
from numpyro.util import _validate_model, check_model_guide_match, find_stack_level


[docs]class ELBO: """ Base class for all ELBO objectives. Subclasses should implement either :meth:`loss` or :meth:`loss_with_mutable_state`. :param num_particles: The number of particles/samples used to form the ELBO (gradient) estimators. :param vectorize_particles: Whether to use `jax.vmap` to compute ELBOs over the num_particles-many particles in parallel. If False use `jax.lax.map`. Defaults to True. """ """ Determines whether the ELBO objective can support inference of discrete latent variables. Subclasses that are capable of inferring discrete latent variables should override to `True` """ can_infer_discrete = False def __init__(self, num_particles=1, vectorize_particles=True): self.num_particles = num_particles self.vectorize_particles = vectorize_particles
[docs] def loss(self, rng_key, param_map, model, guide, *args, **kwargs): """ Evaluates the ELBO with an estimator that uses num_particles many samples/particles. :param jax.random.PRNGKey rng_key: random number generator seed. :param dict param_map: dictionary of current parameter values keyed by site name. :param model: Python callable with NumPyro primitives for the model. :param guide: Python callable with NumPyro primitives for the guide. :param args: arguments to the model / guide (these can possibly vary during the course of fitting). :param kwargs: keyword arguments to the model / guide (these can possibly vary during the course of fitting). :return: negative of the Evidence Lower Bound (ELBO) to be minimized. """ return self.loss_with_mutable_state( rng_key, param_map, model, guide, *args, **kwargs )["loss"]
[docs] def loss_with_mutable_state( self, rng_key, param_map, model, guide, *args, **kwargs ): """ Like :meth:`loss` but also update and return the mutable state, which stores the values at :func:`~numpyro.mutable` sites. :param jax.random.PRNGKey rng_key: random number generator seed. :param dict param_map: dictionary of current parameter values keyed by site name. :param model: Python callable with NumPyro primitives for the model. :param guide: Python callable with NumPyro primitives for the guide. :param args: arguments to the model / guide (these can possibly vary during the course of fitting). :param kwargs: keyword arguments to the model / guide (these can possibly vary during the course of fitting). :return: dictionay containing ELBO loss and the mutable state """ raise NotImplementedError("This ELBO objective does not support mutable state.")
[docs]class Trace_ELBO(ELBO): """ A trace implementation of ELBO-based SVI. The estimator is constructed along the lines of references [1] and [2]. There are no restrictions on the dependency structure of the model or the guide. This is the most basic implementation of the Evidence Lower Bound, which is the fundamental objective in Variational Inference. This implementation has various limitations (for example it only supports random variables with reparameterized samplers) but can be used as a template to build more sophisticated loss objectives. For more details, refer to http://pyro.ai/examples/svi_part_i.html. **References:** 1. *Automated Variational Inference in Probabilistic Programming*, David Wingate, Theo Weber 2. *Black Box Variational Inference*, Rajesh Ranganath, Sean Gerrish, David M. Blei :param num_particles: The number of particles/samples used to form the ELBO (gradient) estimators. :param vectorize_particles: Whether to use `jax.vmap` to compute ELBOs over the num_particles-many particles in parallel. If False use `jax.lax.map`. Defaults to True. """
[docs] def loss_with_mutable_state( self, rng_key, param_map, model, guide, *args, **kwargs ): def single_particle_elbo(rng_key): params = param_map.copy() model_seed, guide_seed = random.split(rng_key) seeded_model = seed(model, model_seed) seeded_guide = seed(guide, guide_seed) guide_log_density, guide_trace = log_density( seeded_guide, args, kwargs, param_map ) mutable_params = { name: site["value"] for name, site in guide_trace.items() if site["type"] == "mutable" } params.update(mutable_params) seeded_model = replay(seeded_model, guide_trace) model_log_density, model_trace = log_density( seeded_model, args, kwargs, params ) check_model_guide_match(model_trace, guide_trace) _validate_model(model_trace, plate_warning="loose") mutable_params.update( { name: site["value"] for name, site in model_trace.items() if site["type"] == "mutable" } ) # log p(z) - log q(z) elbo_particle = model_log_density - guide_log_density if mutable_params: if self.num_particles == 1: return elbo_particle, mutable_params else: raise ValueError( "Currently, we only support mutable states with num_particles=1." ) else: return elbo_particle, None # Return (-elbo) since by convention we do gradient descent on a loss and # the ELBO is a lower bound that needs to be maximized. if self.num_particles == 1: elbo, mutable_state = single_particle_elbo(rng_key) return {"loss": -elbo, "mutable_state": mutable_state} else: rng_keys = random.split(rng_key, self.num_particles) if self.vectorize_particles: elbos, mutable_state = vmap(single_particle_elbo)(rng_keys) else: elbos, mutable_state = jax.lax.map(single_particle_elbo, rng_keys) return {"loss": -jnp.mean(elbos), "mutable_state": mutable_state}
def _get_log_prob_sum(site): if site["intermediates"]: log_prob = site["fn"].log_prob(site["value"], site["intermediates"]) else: log_prob = site["fn"].log_prob(site["value"]) log_prob = scale_and_mask(log_prob, site["scale"]) return jnp.sum(log_prob) def _check_mean_field_requirement(model_trace, guide_trace): """ Checks that the guide and model sample sites are ordered identically. This is sufficient but not necessary for correctness. """ model_sites = [ name for name, site in model_trace.items() if site["type"] == "sample" and name in guide_trace ] guide_sites = [ name for name, site in guide_trace.items() if site["type"] == "sample" and name in model_trace ] assert set(model_sites) == set(guide_sites) if model_sites != guide_sites: warnings.warn( "Failed to verify mean field restriction on the guide. " "To eliminate this warning, ensure model and guide sites " "occur in the same order.\n" + "Model sites:\n " + "\n ".join(model_sites) + "Guide sites:\n " + "\n ".join(guide_sites), stacklevel=find_stack_level(), ),
[docs]class TraceMeanField_ELBO(ELBO): """ A trace implementation of ELBO-based SVI. This is currently the only ELBO estimator in NumPyro that uses analytic KL divergences when those are available. .. warning:: This estimator may give incorrect results if the mean-field condition is not satisfied. The mean field condition is a sufficient but not necessary condition for this estimator to be correct. The precise condition is that for every latent variable `z` in the guide, its parents in the model must not include any latent variables that are descendants of `z` in the guide. Here 'parents in the model' and 'descendants in the guide' is with respect to the corresponding (statistical) dependency structure. For example, this condition is always satisfied if the model and guide have identical dependency structures. """
[docs] def loss_with_mutable_state( self, rng_key, param_map, model, guide, *args, **kwargs ): def single_particle_elbo(rng_key): params = param_map.copy() model_seed, guide_seed = random.split(rng_key) seeded_model = seed(model, model_seed) seeded_guide = seed(guide, guide_seed) subs_guide = substitute(seeded_guide, data=param_map) guide_trace = trace(subs_guide).get_trace(*args, **kwargs) mutable_params = { name: site["value"] for name, site in guide_trace.items() if site["type"] == "mutable" } params.update(mutable_params) subs_model = substitute(replay(seeded_model, guide_trace), data=params) model_trace = trace(subs_model).get_trace(*args, **kwargs) mutable_params.update( { name: site["value"] for name, site in model_trace.items() if site["type"] == "mutable" } ) check_model_guide_match(model_trace, guide_trace) _validate_model(model_trace, plate_warning="loose") _check_mean_field_requirement(model_trace, guide_trace) elbo_particle = 0 for name, model_site in model_trace.items(): if model_site["type"] == "sample": if model_site["is_observed"]: elbo_particle = elbo_particle + _get_log_prob_sum(model_site) else: guide_site = guide_trace[name] try: kl_qp = kl_divergence(guide_site["fn"], model_site["fn"]) kl_qp = scale_and_mask(kl_qp, scale=guide_site["scale"]) elbo_particle = elbo_particle - jnp.sum(kl_qp) except NotImplementedError: elbo_particle = ( elbo_particle + _get_log_prob_sum(model_site) - _get_log_prob_sum(guide_site) ) # handle auxiliary sites in the guide for name, site in guide_trace.items(): if site["type"] == "sample" and name not in model_trace: assert site["infer"].get("is_auxiliary") or site["is_observed"] elbo_particle = elbo_particle - _get_log_prob_sum(site) if mutable_params: if self.num_particles == 1: return elbo_particle, mutable_params else: raise ValueError( "Currently, we only support mutable states with num_particles=1." ) else: return elbo_particle, None if self.num_particles == 1: elbo, mutable_state = single_particle_elbo(rng_key) return {"loss": -elbo, "mutable_state": mutable_state} else: rng_keys = random.split(rng_key, self.num_particles) if self.vectorize_particles: elbos, mutable_state = vmap(single_particle_elbo)(rng_keys) else: elbos, mutable_state = jax.lax.map(single_particle_elbo, rng_keys) return {"loss": -jnp.mean(elbos), "mutable_state": mutable_state}
[docs]class RenyiELBO(ELBO): r""" An implementation of Renyi's :math:`\alpha`-divergence variational inference following reference [1]. In order for the objective to be a strict lower bound, we require :math:`\alpha \ge 0`. Note, however, that according to reference [1], depending on the dataset :math:`\alpha < 0` might give better results. In the special case :math:`\alpha = 0`, the objective function is that of the important weighted autoencoder derived in reference [2]. .. note:: Setting :math:`\alpha < 1` gives a better bound than the usual ELBO. :param float alpha: The order of :math:`\alpha`-divergence. Here :math:`\alpha \neq 1`. Default is 0. :param num_particles: The number of particles/samples used to form the objective (gradient) estimator. Default is 2. :param vectorize_particles: Whether to use `jax.vmap` to compute ELBOs over the num_particles-many particles in parallel. If False use `jax.lax.map`. Defaults to True. Example:: def model(data): with numpyro.plate("batch", 10000, subsample_size=100): latent = numpyro.sample("latent", dist.Normal(0, 1)) batch = numpyro.subsample(data, event_dim=0) numpyro.sample("data", dist.Bernoulli(logits=latent), obs=batch) def guide(data): w_loc = numpyro.param("w_loc", 1.) w_scale = numpyro.param("w_scale", 1.) with numpyro.plate("batch", 10000, subsample_size=100): batch = numpyro.subsample(data, event_dim=0) loc = w_loc * batch scale = jnp.exp(w_scale * batch) numpyro.sample("latent", dist.Normal(loc, scale)) elbo = RenyiELBO(num_particles=10) svi = SVI(model, guide, optax.adam(0.1), elbo) **References:** 1. *Renyi Divergence Variational Inference*, Yingzhen Li, Richard E. Turner 2. *Importance Weighted Autoencoders*, Yuri Burda, Roger Grosse, Ruslan Salakhutdinov """ def __init__(self, alpha=0, num_particles=2): if alpha == 1: raise ValueError( "The order alpha should not be equal to 1. Please use ELBO class" "for the case alpha = 1." ) self.alpha = alpha super().__init__(num_particles=num_particles) def _single_particle_elbo(self, model, guide, param_map, args, kwargs, rng_key): model_seed, guide_seed = random.split(rng_key) seeded_model = seed(model, model_seed) seeded_guide = seed(guide, guide_seed) model_trace, guide_trace = get_importance_trace( seeded_model, seeded_guide, args, kwargs, param_map ) check_model_guide_match(model_trace, guide_trace) _validate_model(model_trace, plate_warning="loose") site_plates = { name: {frame for frame in site["cond_indep_stack"]} for name, site in model_trace.items() if site["type"] == "sample" } # We will compute Renyi elbos separately across dimensions # defined in indep_plates. Then the final elbo is the sum # of those independent elbos. if site_plates: indep_plates = set.intersection(*site_plates.values()) else: indep_plates = set() for frame in set.union(*site_plates.values()): if frame not in indep_plates: subsample_size = frame.size size = model_trace[frame.name]["args"][0] if size > subsample_size: raise ValueError( "RenyiELBO only supports subsampling in plates that are common" " to all sample sites, e.g. a data plate that encloses the" " entire model." ) indep_plate_scale = 1.0 for frame in indep_plates: subsample_size = frame.size size = model_trace[frame.name]["args"][0] if size > subsample_size: indep_plate_scale = indep_plate_scale * size / subsample_size indep_plate_dims = {frame.dim for frame in indep_plates} log_densities = {} for trace_type, tr in {"guide": guide_trace, "model": model_trace}.items(): log_densities[trace_type] = 0.0 for site in tr.values(): if site["type"] != "sample": continue log_prob = site["log_prob"] squeeze_axes = () for dim in range(log_prob.ndim): neg_dim = dim - log_prob.ndim if neg_dim in indep_plate_dims: continue log_prob = jnp.sum(log_prob, axis=dim, keepdims=True) squeeze_axes = squeeze_axes + (dim,) log_prob = jnp.squeeze(log_prob, squeeze_axes) log_densities[trace_type] = log_densities[trace_type] + log_prob # log p(z) - log q(z) elbo = log_densities["model"] - log_densities["guide"] # Log probabilities at indep_plates dimensions are scaled to MC approximate # the "full size" log probabilities. Because we want to compute Renyi elbos # separately across indep_plates dimensions, we will remove such scale now. # We will apply such scale after getting those Renyi elbos. return elbo / indep_plate_scale, indep_plate_scale
[docs] def loss(self, rng_key, param_map, model, guide, *args, **kwargs): plate_key, rng_key = random.split(rng_key) model = seed( model, plate_key, hide_types=["sample", "prng_key", "control_flow"] ) guide = seed( guide, plate_key, hide_types=["sample", "prng_key", "control_flow"] ) single_particle_elbo = partial( self._single_particle_elbo, model, guide, param_map, args, kwargs ) rng_keys = random.split(rng_key, self.num_particles) if self.vectorize_particles: elbos, common_plate_scale = vmap(single_particle_elbo)(rng_keys) else: elbos, common_plate_scale = jax.lax.map(single_particle_elbo, rng_keys) assert common_plate_scale.shape == (self.num_particles,) assert elbos.shape[0] == self.num_particles scaled_elbos = (1.0 - self.alpha) * elbos avg_log_exp = logsumexp(scaled_elbos, axis=0) - jnp.log(self.num_particles) assert avg_log_exp.shape == elbos.shape[1:] weights = jnp.exp(scaled_elbos - avg_log_exp) renyi_elbo = avg_log_exp / (1.0 - self.alpha) weighted_elbo = (stop_gradient(weights) * elbos).mean(0) assert renyi_elbo.shape == elbos.shape[1:] assert weighted_elbo.shape == elbos.shape[1:] loss = -(stop_gradient(renyi_elbo - weighted_elbo) + weighted_elbo) # common_plate_scale should be the same across particles. return loss.sum() * common_plate_scale[0]
def _get_plate_stacks(trace): """ This builds a dict mapping site name to a set of plate stacks. Each plate stack is a list of :class:`CondIndepStackFrame`s corresponding to a :class:`plate`. This information is used by :class:`Trace_ELBO` and :class:`TraceGraph_ELBO`. """ return { name: [f for f in node["cond_indep_stack"]] for name, node in trace.items() if node["type"] == "sample" } class MultiFrameTensor(dict): """ A container for sums of Tensors among different :class:`plate` contexts. Used in :class:`~numpyro.infer.elbo.TraceGraph_ELBO` to simplify downstream cost computation logic. Example:: downstream_cost = MultiFrameTensor() for site in downstream_nodes: downstream_cost.add((site["cond_indep_stack"], site["log_prob"])) downstream_cost.add(*other_costs.items()) # add in bulk summed = downstream_cost.sum_to(target_site["cond_indep_stack"]) """ def __init__(self, *items): super().__init__() self.add(*items) def add(self, *items): """ Add a collection of (cond_indep_stack, tensor) pairs. Keys are ``cond_indep_stack``s, i.e. tuples of :class:`CondIndepStackFrame`s. Values are :class:`numpy.ndarray`s. """ for cond_indep_stack, value in items: frames = frozenset(f for f in cond_indep_stack) assert all(f.dim < 0 and -jnp.ndim(value) <= f.dim for f in frames) if frames in self: self[frames] = self[frames] + value else: self[frames] = value def sum_to(self, target_frames): total = None for frames, value in self.items(): for f in frames: if f not in target_frames and jnp.shape(value)[f.dim] != 1: value = value.sum(f.dim, keepdims=True) while jnp.shape(value) and jnp.shape(value)[0] == 1: value = value.squeeze(0) total = value if total is None else total + value return 0.0 if total is None else total def __repr__(self): return "%s(%s)" % ( type(self).__name__, ",\n\t".join(["({}, ...)".format(frames) for frames in self]), ) def _identify_dense_edges(trace): succ = {} for name, node in trace.items(): if node["type"] == "sample": succ[name] = set() for name, node in trace.items(): if node["type"] == "sample": for past_name, past_node in trace.items(): if past_node["type"] == "sample": if past_name == name: break # XXX: different from Pyro, we always add edge past_name -> name succ[past_name].add(name) return succ def _topological_sort(succ, reverse=False): """ Return a list of nodes (site names) in topologically sorted order. """ def dfs(site, visited): if site in visited: return for s in succ[site]: for node in dfs(s, visited): yield node visited.add(site) yield site visited = set() top_sorted = [] for s in succ: for node in dfs(s, visited): top_sorted.append(node) return top_sorted if reverse else list(reversed(top_sorted)) def _compute_downstream_costs(model_trace, guide_trace, non_reparam_nodes): model_successors = _identify_dense_edges(model_trace) guide_successors = _identify_dense_edges(guide_trace) # recursively compute downstream cost nodes for all sample sites in model and guide # (even though ultimately just need for non-reparameterizable sample sites) # 1. downstream costs used for rao-blackwellization # 2. model observe sites (as well as terms that arise from the model and guide having different # dependency structures) are taken care of via 'children_in_model' below topo_sort_guide_nodes = _topological_sort(guide_successors, reverse=True) topo_sort_guide_nodes = [ x for x in topo_sort_guide_nodes if guide_trace[x]["type"] == "sample" ] ordered_guide_nodes_dict = {n: i for i, n in enumerate(topo_sort_guide_nodes)} downstream_guide_cost_nodes = {} downstream_costs = {} stacks = _get_plate_stacks(model_trace) for node in topo_sort_guide_nodes: downstream_costs[node] = MultiFrameTensor( ( stacks[node], model_trace[node]["log_prob"] - guide_trace[node]["log_prob"], ) ) nodes_included_in_sum = set([node]) downstream_guide_cost_nodes[node] = set([node]) # make more efficient by ordering children appropriately (higher children first) children = [(k, -ordered_guide_nodes_dict[k]) for k in guide_successors[node]] sorted_children = sorted(children, key=itemgetter(1)) for child, _ in sorted_children: child_cost_nodes = downstream_guide_cost_nodes[child] downstream_guide_cost_nodes[node].update(child_cost_nodes) if nodes_included_in_sum.isdisjoint(child_cost_nodes): # avoid duplicates downstream_costs[node].add(*downstream_costs[child].items()) # XXX nodes_included_in_sum logic could be more fine-grained, possibly leading # to speed-ups in case there are many duplicates nodes_included_in_sum.update(child_cost_nodes) missing_downstream_costs = ( downstream_guide_cost_nodes[node] - nodes_included_in_sum ) # include terms we missed because we had to avoid duplicates for missing_node in missing_downstream_costs: downstream_costs[node].add( ( stacks[missing_node], model_trace[missing_node]["log_prob"] - guide_trace[missing_node]["log_prob"], ) ) # finish assembling complete downstream costs # (the above computation may be missing terms from model) for site in non_reparam_nodes: children_in_model = set() for node in downstream_guide_cost_nodes[site]: children_in_model.update(model_successors[node]) # remove terms accounted for above children_in_model.difference_update(downstream_guide_cost_nodes[site]) for child in children_in_model: assert model_trace[child]["type"] == "sample" downstream_costs[site].add((stacks[child], model_trace[child]["log_prob"])) downstream_guide_cost_nodes[site].update([child]) for k in non_reparam_nodes: downstream_costs[k] = downstream_costs[k].sum_to( guide_trace[k]["cond_indep_stack"] ) return downstream_costs, downstream_guide_cost_nodes def get_importance_log_probs(model, guide, args, kwargs, params): """ Returns log probabilities at each site for the guide and the model that is run against it. """ model_tr, guide_tr = get_importance_trace(model, guide, args, kwargs, params) model_log_probs = { name: site["log_prob"] for name, site in model_tr.items() if site["type"] == "sample" } guide_log_probs = { name: site["log_prob"] for name, site in guide_tr.items() if site["type"] == "sample" } return model_log_probs, guide_log_probs def _substitute_nonreparam(data, msg): if msg["name"] in data and not msg["fn"].has_rsample: value = msg["fn"](*msg["args"], **msg["kwargs"]) value = 0 * value + data[msg["name"]] return value def _get_latents(model, guide, args, kwargs, params): model = seed(substitute(model, data=params), rng_seed=0) guide = seed(substitute(guide, data=params), rng_seed=0) guide_tr = trace(guide).get_trace(*args, **kwargs) model_tr = trace(replay(model, guide_tr)).get_trace(*args, **kwargs) model_tr.update(guide_tr) return { name: site["value"] for name, site in model_tr.items() if site["type"] == "sample" and not site.get("is_observed", False) } def get_nonreparam_deps(model, guide, args, kwargs, param_map, latents=None): """Find dependencies on non-reparameterizable sample sites for each cost term in the model and the guide.""" if latents is None: latents = eval_shape( partial(_get_latents, model, guide, args, kwargs, param_map) ) def fn(**latents): subs_fn = partial(_substitute_nonreparam, latents) subs_model = substitute(seed(model, rng_seed=0), substitute_fn=subs_fn) subs_guide = substitute(seed(guide, rng_seed=0), substitute_fn=subs_fn) return get_importance_log_probs(subs_model, subs_guide, args, kwargs, param_map) model_deps, guide_deps = eval_provenance(fn, **latents) return model_deps, guide_deps
[docs]class TraceGraph_ELBO(ELBO): """ A TraceGraph implementation of ELBO-based SVI. The gradient estimator is constructed along the lines of reference [1] specialized to the case of the ELBO. It supports arbitrary dependency structure for the model and guide. Fine-grained conditional dependency information as recorded in the trace is used to reduce the variance of the gradient estimator. In particular provenance tracking [2] is used to find the ``cost`` terms that depend on each non-reparameterizable sample site. References [1] `Gradient Estimation Using Stochastic Computation Graphs`, John Schulman, Nicolas Heess, Theophane Weber, Pieter Abbeel [2] `Nonstandard Interpretations of Probabilistic Programs for Efficient Inference`, David Wingate, Noah Goodman, Andreas Stuhlmüller, Jeffrey Siskind """ can_infer_discrete = True def __init__(self, num_particles=1, vectorize_particles=True): super().__init__( num_particles=num_particles, vectorize_particles=vectorize_particles )
[docs] def loss(self, rng_key, param_map, model, guide, *args, **kwargs): """ Evaluates the ELBO with an estimator that uses num_particles many samples/particles. :param jax.random.PRNGKey rng_key: random number generator seed. :param dict param_map: dictionary of current parameter values keyed by site name. :param model: Python callable with NumPyro primitives for the model. :param guide: Python callable with NumPyro primitives for the guide. :param args: arguments to the model / guide (these can possibly vary during the course of fitting). :param kwargs: keyword arguments to the model / guide (these can possibly vary during the course of fitting). :return: negative of the Evidence Lower Bound (ELBO) to be minimized. """ def single_particle_elbo(rng_key): model_seed, guide_seed = random.split(rng_key) seeded_model = seed(model, model_seed) seeded_guide = seed(guide, guide_seed) model_trace, guide_trace = get_importance_trace( seeded_model, seeded_guide, args, kwargs, param_map ) check_model_guide_match(model_trace, guide_trace) _validate_model(model_trace, plate_warning="strict") latents = {} for name, site in guide_trace.items(): if site["type"] == "sample" and not site.get("is_observed", False): latents[name] = site["value"] model_deps, guide_deps = get_nonreparam_deps( model, guide, args, kwargs, param_map, latents=latents ) elbo = 0.0 # mapping from non-reparameterizable sample sites to cost terms influenced by each of them downstream_costs = defaultdict(lambda: MultiFrameTensor()) for name, site in model_trace.items(): if site["type"] == "sample": elbo = elbo + jnp.sum(site["log_prob"]) # add the log_prob to each non-reparam sample site upstream for key in model_deps[name]: downstream_costs[key].add( (site["cond_indep_stack"], site["log_prob"]) ) for name, site in guide_trace.items(): if site["type"] == "sample": log_prob_sum = jnp.sum(site["log_prob"]) if not site["fn"].has_rsample: log_prob_sum = stop_gradient(log_prob_sum) elbo = elbo - log_prob_sum # add the -log_prob to each non-reparam sample site upstream for key in guide_deps[name]: downstream_costs[key].add( (site["cond_indep_stack"], -site["log_prob"]) ) for node, downstream_cost in downstream_costs.items(): guide_site = guide_trace[node] downstream_cost = downstream_cost.sum_to(guide_site["cond_indep_stack"]) surrogate = jnp.sum( guide_site["log_prob"] * stop_gradient(downstream_cost) ) elbo = elbo + surrogate - stop_gradient(surrogate) return elbo # Return (-elbo) since by convention we do gradient descent on a loss and # the ELBO is a lower bound that needs to be maximized. if self.num_particles == 1: return -single_particle_elbo(rng_key) else: rng_keys = random.split(rng_key, self.num_particles) if self.vectorize_particles: return -jnp.mean(vmap(single_particle_elbo)(rng_keys)) else: return -jnp.mean(jax.lax.map(single_particle_elbo, rng_keys))
def get_importance_trace_enum( model, guide, args, kwargs, params, max_plate_nesting, model_deps, guide_desc ): """ (EXPERIMENTAL) Returns traces from the enumerated guide and the enumerated model that is run against it. The returned traces also store the log probability at each site and the log measure for measure vars. """ import funsor from numpyro.contrib.funsor import ( enum, plate_to_enum_plate, to_funsor, trace as _trace, ) with plate_to_enum_plate(), enum( first_available_dim=(-max_plate_nesting - 1) if max_plate_nesting else None ): guide = substitute(guide, data=params) with _without_rsample_stop_gradient(): guide_trace = _trace(guide).get_trace(*args, **kwargs) model = substitute(replay(model, guide_trace), data=params) model_trace = _trace(model).get_trace(*args, **kwargs) sum_vars = frozenset() for is_model, tr in zip((True, False), (model_trace, guide_trace)): for name, site in tr.items(): if site["type"] == "sample": value = site["value"] intermediates = site["intermediates"] dim_to_name = site["infer"]["dim_to_name"] # compute log factor if is_model and model_trace[name]["infer"].get("kl") == "analytic": if not model_deps[name].isdisjoint(guide_desc[name]): raise AssertionError( f"Expected that for use of analytic KL computation for the latent variable `{name}` its " "parents in the model do not include any non-reparameterizable latent variables that " f"are descendants of `{name}` in the guide. But found variable(s) " f"{[var for var in (model_deps[name] & guide_desc[name])]} both in the parents of " f"`{name}` in the model and in the descendants of `{name}` in the guide." ) if not model_deps[name].isdisjoint(sum_vars): raise AssertionError( f"Expected that for use of analytic KL computation for the latent variable `{name}` its " "parents in the model do not include any model-side enumerated latent variables, but " f"found enumerated variable(s) {[var for var in (model_deps[name] & sum_vars)]}." ) if name not in guide_trace: raise AssertionError( f"Expected that for use of analytic KL computation for the latent variable `{name}` it " "must be present both in the model and the guide traces, but not found in the guide trace." ) kl_qp = kl_divergence( guide_trace[name]["fn"], model_trace[name]["fn"] ) dim_to_name.update(guide_trace[name]["infer"]["dim_to_name"]) site["kl"] = to_funsor( kl_qp, output=funsor.Real, dim_to_name=dim_to_name ) elif not is_model and (model_trace[name].get("kl") is not None): # skip logq computation if analytic kl was computed pass else: if intermediates: log_prob = site["fn"].log_prob(value, intermediates) else: log_prob = site["fn"].log_prob(value) site["log_prob"] = to_funsor( log_prob, output=funsor.Real, dim_to_name=dim_to_name ) # compute log measure if not is_model or not (site["is_observed"] or (name in guide_trace)): if is_model: sum_vars |= frozenset([name]) # get rid of masking base_fn = site["fn"] batch_shape = base_fn.batch_shape while isinstance( base_fn, (MaskedDistribution, ExpandedDistribution) ): base_fn = base_fn.base_dist base_fn = base_fn.expand(batch_shape) if intermediates: log_measure = base_fn.log_prob(value, intermediates) else: log_measure = base_fn.log_prob(value) # dice factor if not site["infer"].get("enumerate") == "parallel": log_measure = log_measure - funsor.ops.detach(log_measure) site["log_measure"] = to_funsor( log_measure, output=funsor.Real, dim_to_name=dim_to_name ) return model_trace, guide_trace, sum_vars def _partition(model_sum_deps, sum_vars): # Construct a bipartite graph between model_sum_deps and the sum_vars neighbors = OrderedDict([(t, []) for t in model_sum_deps.keys()]) for key, deps in model_sum_deps.items(): for dim in deps: if dim in sum_vars: neighbors[key].append(dim) neighbors.setdefault(dim, []).append(key) # Partition the bipartite graph into connected components for contraction. components = [] while neighbors: v, pending = neighbors.popitem() component = OrderedDict([(v, None)]) # used as an OrderedSet for v in pending: component[v] = None while pending: v = pending.pop() for v in neighbors.pop(v): if v not in component: component[v] = None pending.append(v) # Split this connected component into factors and measures. # Append only if component_factors is non-empty component_factors = frozenset(v for v in component if v not in sum_vars) if component_factors: component_measures = frozenset(v for v in component if v in sum_vars) components.append((component_factors, component_measures)) return components def guess_max_plate_nesting(model, guide, args, kwargs, param_map): """Guess maximum plate nesting by performing jax shape inference.""" model_shapes, guide_shapes = eval_shape( partial( get_importance_log_probs, model, guide, args, kwargs, param_map, ) ) ndims = [ len(site.shape) for sites in (model_shapes, guide_shapes) for site in sites.values() ] max_plate_nesting = max(ndims) if ndims else 0 return max_plate_nesting
[docs]class TraceEnum_ELBO(ELBO): """ (EXPERIMENTAL) A TraceEnum implementation of ELBO-based SVI. The gradient estimator is constructed along the lines of reference [1] specialized to the case of the ELBO. It supports arbitrary dependency structure for the model and guide. Fine-grained conditional dependency information as recorded in the trace is used to reduce the variance of the gradient estimator. In particular provenance tracking [2] is used to find the ``cost`` terms that depend on each non-reparameterizable sample site. Enumerated variables are eliminated using the TVE algorithm for plated factor graphs [3]. References [1] `Storchastic: A Framework for General Stochastic Automatic Differentiation`, Emile van Kriekenc, Jakub M. Tomczak, Annette ten Teije [2] `Nonstandard Interpretations of Probabilistic Programs for Efficient Inference`, David Wingate, Noah Goodman, Andreas Stuhlmüller, Jeffrey Siskind [3] `Tensor Variable Elimination for Plated Factor Graphs`, Fritz Obermeyer, Eli Bingham, Martin Jankowiak, Justin Chiu, Neeraj Pradhan, Alexander M. Rush, Noah Goodman """ can_infer_discrete = True def __init__( self, num_particles=1, max_plate_nesting=float("inf"), vectorize_particles=True ): self.max_plate_nesting = max_plate_nesting super().__init__( num_particles=num_particles, vectorize_particles=vectorize_particles )
[docs] def loss(self, rng_key, param_map, model, guide, *args, **kwargs): def single_particle_elbo(rng_key): import funsor from numpyro.contrib.funsor import to_data model_seed, guide_seed = random.split(rng_key) if self.max_plate_nesting == float("inf"): seeded_model = seed(model, model_seed) seeded_guide = seed(guide, guide_seed) # XXX: We can extract abstract latents here such that they # can be reused in get_nonreparam_deps below. self.max_plate_nesting = guess_max_plate_nesting( seeded_model, seeded_guide, args, kwargs, param_map ) # get dependencies on nonreparametrizable variables model_deps, guide_deps = get_nonreparam_deps( model, guide, args, kwargs, param_map ) # get descendants of variables in the guide guide_desc = defaultdict(frozenset) for name, deps in guide_deps.items(): for d in deps: if name != d: guide_desc[d] |= frozenset([name]) seeded_model = seed(model, model_seed) seeded_guide = seed(guide, guide_seed) model_trace, guide_trace, sum_vars = get_importance_trace_enum( seeded_model, seeded_guide, args, kwargs, param_map, self.max_plate_nesting, model_deps, guide_desc, ) # TODO: fix the check of model/guide distribution shapes # check_model_guide_match(model_trace, guide_trace) _validate_model(model_trace, plate_warning="strict") model_vars = frozenset(model_deps) model_sum_deps = { k: v & sum_vars for k, v in model_deps.items() if k not in sum_vars } model_deps = { k: v - sum_vars for k, v in model_deps.items() if k not in sum_vars } # gather cost terms cost_terms = [] for group_names, group_sum_vars in _partition(model_sum_deps, sum_vars): if not group_sum_vars: # uncontracted logp cost term assert len(group_names) == 1 name = next(iter(group_names)) if model_trace[name].get("kl") is not None: cost = -model_trace[name]["kl"] scale = model_trace[name]["scale"] assert scale == guide_trace[name]["scale"] deps = (model_deps[name] | guide_deps[name]) - frozenset([name]) del guide_deps[name] else: cost = model_trace[name]["log_prob"] scale = model_trace[name]["scale"] deps = model_deps[name] else: # compute contracted cost term group_factors = tuple( model_trace[name]["log_prob"] for name in group_names ) group_factors += tuple( model_trace[var]["log_measure"] for var in group_sum_vars ) group_factor_vars = frozenset().union( *[f.inputs for f in group_factors] ) group_plates = group_factor_vars - model_vars outermost_plates = frozenset.intersection( *(frozenset(f.inputs) & group_plates for f in group_factors) ) elim_plates = group_plates - outermost_plates with funsor.interpretations.normalize: cost = funsor.sum_product.sum_product( funsor.ops.logaddexp, funsor.ops.add, group_factors, plates=group_plates, eliminate=group_sum_vars | elim_plates, ) # TODO: add memoization cost = funsor.optimizer.apply_optimizer(cost) # incorporate the effects of subsampling and handlers.scale through a common scale factor scales_set = set() for name in group_names | group_sum_vars: site_scale = model_trace[name]["scale"] if site_scale is None: site_scale = 1.0 if isinstance(site_scale, jnp.ndarray): raise ValueError( "Enumeration only supports scalar handlers.scale" ) scales_set.add(float(site_scale)) if len(scales_set) != 1: raise ValueError( "Expected all enumerated sample sites to share a common scale, " f"but found {len(scales_set)} different scales." ) scale = next(iter(scales_set)) # combine deps deps = frozenset().union( *[model_deps[name] for name in group_names] ) # check model guide enumeration constraint for key in deps: site = guide_trace[key] if site["infer"].get("enumerate") == "parallel": for p in ( frozenset(site["log_measure"].inputs) & elim_plates ): raise ValueError( "Expected model enumeration to be no more global than guide enumeration, but found " f"model enumeration sites upstream of guide site '{key}' in plate('{p}')." "Try converting some model enumeration sites to guide enumeration sites." ) cost_terms.append((cost, scale, deps)) for name, deps in guide_deps.items(): # -logq cost term cost = -guide_trace[name]["log_prob"] scale = guide_trace[name]["scale"] cost_terms.append((cost, scale, deps)) # compute elbo elbo = 0.0 for cost, scale, deps in cost_terms: if deps: dice_factors = tuple( guide_trace[key]["log_measure"] for key in deps ) dice_factor_vars = frozenset().union( *[f.inputs for f in dice_factors] ) cost_vars = frozenset(cost.inputs) with funsor.interpretations.normalize: dice_factor = funsor.sum_product.sum_product( funsor.ops.logaddexp, funsor.ops.add, dice_factors, plates=(dice_factor_vars | cost_vars) - model_vars, eliminate=dice_factor_vars - cost_vars, ) # TODO: add memoization dice_factor = funsor.optimizer.apply_optimizer(dice_factor) cost = cost * funsor.ops.exp(dice_factor) if (scale is not None) and (not is_identically_one(scale)): cost = cost * scale elbo = elbo + cost.reduce(funsor.ops.add) return to_data(elbo) # Return (-elbo) since by convention we do gradient descent on a loss and # the ELBO is a lower bound that needs to be maximized. if self.num_particles == 1: return -single_particle_elbo(rng_key) else: rng_keys = random.split(rng_key, self.num_particles) if self.vectorize_particles: return -jnp.mean(vmap(single_particle_elbo)(rng_keys)) else: return -jnp.mean(jax.lax.map(single_particle_elbo, rng_keys))