r"""
Bumps wrapper for PyMC models.
"""
from __future__ import print_function
__all__ = ["PyMCProblem"]
import numpy as np
from numpy import inf, array, asarray
# pyMC model attributes:
# - deterministics [Intermediate variables]
# - stochastics (with observed=False) [Fitted parameters]
# - data (stochastic variables with observed=True)
# - variables [stochastics+
# - potentials
# - containers
# - nodes
# - all_objects
# - status: Not useful for the Model base class, but may be used by subclasses.
[docs]
class PyMCProblem(object):
def __init__(self, input):
from pymc.Model import Model
from pymc.Node import ZeroProbability
self.model = Model(input)
# sort parameters by name
ordered_pairs = sorted((s.__name__, s) for s in self.model.stochastics)
# Record parameter, shape, size, offset as a list
pars = [v for k, v in ordered_pairs]
shape = [v.shape for v in pars]
size = [(np.prod(s) if s != () else 1) for s in shape]
offset = np.cumsum([0]+size[:-1])
self.pars = list(zip(pars, shape, size, offset))
# List of cost functions contains both parameter and data, but not
# intermediate deterministic points
self.costs = self.model.variables - self.model.deterministics
# Degrees of freedom is #data - #pars
points = sum((np.prod(p.shape) if p.shape != () else 1)
for p in self.costs)
self.dof = points - 2*offset[-1]
self.ZeroProbability = ZeroProbability
[docs]
def model_reset(self):
pass
[docs]
def chisq(self):
return self.nllf() # /self.dof
[docs]
def chisq_str(self):
return "%g"%self.chisq()
__call__ = chisq
[docs]
def nllf(self, pvec=None):
if pvec is not None: self.setp(pvec)
try:
return -sum(c.logp for c in self.costs)
except self.ZeroProbability:
return inf
[docs]
def setp(self, values):
for par, shape, size, offset in self.pars:
if shape == ():
par.value = values[offset]
offset += 1
else:
par.value = array(values[offset:offset+size]).reshape(shape)
offset += size
[docs]
def getp(self):
return np.hstack([(par.value.flatten() if shape != () else par.value)
for par, shape, size, offset in self.pars])
[docs]
def show(self):
# maybe print graph of model
print("[chisq=%g, nllf=%g]" % (self.chisq(), self.nllf()))
print(self.summarize())
[docs]
def summarize(self):
return "\n".join("%s=%s"%(par.__name__, par.value)
for par, _, _, _ in self.pars)
[docs]
def labels(self):
ret = []
for par, _, _, _ in self.pars:
ret.extend(_par_labels(par))
return ret
[docs]
def randomize(self, N=None):
if N is None:
self.model.draw_from_prior()
else:
data = []
for _ in range(N):
self.model.draw_from_prior()
data.append(self.getp())
return asarray(data)
[docs]
def bounds(self):
return np.vstack([_par_bounds(par) for par, _, _,_ in self.pars]).T
[docs]
def plot(self, p=None, fignum=None, figfile=None):
pass
def __deepcopy__(self, memo):
return self
def _par_bounds(par):
# Delay referencing
import pymc.distributions
UNBOUNDED = lambda p: (-inf, inf)
PYMC_BOUNDS = {
pymc.distributions.DiscreteUniform:
lambda p: (p['lower']-0.5, p['upper']+0.5),
pymc.distributions.Uniform:
lambda p: (p['lower'], p['upper']),
pymc.distributions.Exponential:
lambda p: (0, inf),
}
# pyMC doesn't provide info about bounds on the distributions
# so we need a big table.
bounds = PYMC_BOUNDS.get(par.__class__, UNBOUNDED)(par.parents)
ret = np.tile(bounds, par.shape).flatten().reshape(-1, 2)
return ret
def _par_labels(par):
name = par.__name__
dims = len(par.shape)
if dims == 0:
return [name]
elif dims == 1:
return ["%s[%d]"%(name, i)
for i in range(par.shape[0])]
elif dims == 2:
return ["%s[%d,%d]"%(name, i, j)
for i in range(par.shape[0])
for j in range(par.shape[1])]
elif dims == 3:
return ["%s[%d,%d,%d]"%(name, i, j, k)
for i in range(par.shape[0])
for j in range(par.shape[1])
for k in range(par.shape[2])]
else:
raise ValueError("limited to 3 dims for now")