Source code for mics.mixtures

"""
.. module:: mixtures
   :platform: Unix, Windows
   :synopsis: a module for defining the class :class:`Mixture`.

.. moduleauthor:: Charlles R. A. Abreu <abreu@eq.ufrj.br>


"""

import numpy as np
import pandas as pd
from numpy.linalg import multi_dot

from mics.samples import pool
from mics.utils import InputError
from mics.utils import cases
from mics.utils import derivative
from mics.utils import genfunc
from mics.utils import info
from mics.utils import jacobian
from mics.utils import multimap
from mics.utils import overlapSampling


[docs]class mixture: """A mixture of independently collected samples (MICS) Args: samples (list or tuple): a list of samples. title (str, optional): a title. verbose (bool, optional): a verbosity tag. tol (float, optional): a tolerance. """ # ====================================================================================== def __define__(self, samples, title, verbose): np.set_printoptions(precision=4, threshold=15, edgeitems=4, suppress=True) self.title = title self.verbose = verbose self.method = type(self).__name__ verbose and info("Setting up %s case:" % self.method, title) m = self.m = len(samples) if m == 0: raise InputError("list of samples is empty") verbose and info("Number of samples:", m) if type(samples) is pool: self.samples = samples.samples self.label = samples.label else: self.samples = samples self.label = "" names = self.names = list(samples[0].dataset.columns) if any(list(s.dataset.columns) != names for s in samples): raise InputError("provided samples have distinct properties") verbose and info("Properties:", ", ".join(names)) n = self.n = np.array([s.dataset.shape[0] for s in samples]) verbose and info("Sample sizes:", str(self.n)) potentials = [s.potential for s in samples] self.u = [multimap(potentials, s.dataset) for s in samples] self.f = overlapSampling(self.u) verbose and info("Initial free-energy guess:", self.f) neff = self.neff = np.array([s.neff for s in samples]) self.frame = pd.DataFrame(index=np.arange(m) + 1) if self.label: self.states = ["%s = %s" % (self.label, s.label) for s in samples] self.frame[self.label] = [s.label for s in samples] else: self.states = ["state %d" % (i+1) for i in range(m)] return m, n, neff # ====================================================================================== def compute(self, functions, constants): if isinstance(functions, str): funcs = [genfunc(functions, self.names, constants)] else: funcs = [genfunc(f, self.names, constants) for f in functions] return [multimap(funcs, s.dataset) for s in self.samples] # ======================================================================================
[docs] def free_energies(self, reference=0): """ Returns a data frame containing the relative free energies of the datasetd samples of a `mixture`, as well as their standard errors. """ frame = self.frame.copy() frame["f"] = self.f - self.f[reference] T = self.Theta frame["df"] = np.sqrt(np.diag(T) - 2*T[:, reference] + T[reference, reference]) return frame
# ======================================================================================
[docs] def reweighting(self, potential, properties={}, derivatives={}, combinations={}, conditions=pd.DataFrame(), reference=0, **kwargs): """ Performs reweighting of the properties computed by `functions` from the mixture to the samples determined by the provided `potential` with all `parameter` values. Args: potential (string): properties (dict of strings): combinations (dict of strings): derivatives (dict of tuples): conditions (pandas.DataFrame): verbose (boolean): **kwargs: """ # TODO: look for duplicated names or reserved keyword 'f' in properties # TODO: look for duplicated names in properties, derivatives, and combinations # TODO: allow limited recursion in combinations names = ['f'] + list(properties.keys()) if self.verbose: info("Reweighting requested in %s case:" % self.method, self.title) info("Reduced potential:", potential) kwargs and info("Provided constants: ", kwargs) if not derivatives: try: y = self.compute(properties.values(), kwargs) properties_needed = False except (InputError, KeyError): properties_needed = True if (combinations): try: func, Jac = jacobian(combinations.values(), names, kwargs) jacobian_needed = False except InputError: jacobian_needed = True results = list() for constants in cases(conditions, kwargs, self.verbose): u = self.compute(potential, constants) if properties_needed: y = self.compute(properties.values(), constants) g, Theta = self.__reweight__(u, y, reference) dg = np.sqrt(np.diagonal(Theta)) if (combinations): if jacobian_needed: func, Jac = jacobian(combinations.values(), names, constants) h = func(g).flatten() J = Jac(g) dh = np.sqrt(np.diagonal(multi_dot([J, Theta, J.T]))) results.append(np.block([[g, h], [dg, dh]]).T.flatten()) else: results.append(np.stack([g, dg]).T.flatten()) header = sum([[x, "d"+x] for x in names + list(combinations.keys())], []) # if conditions.empty: # return dict(zip(header, results[0])) # else: # return pd.concat([conditions, pd.DataFrame(results, columns=header)], 1) return pd.concat([conditions, pd.DataFrame(results, columns=header)], 1) else: def dec(x): return "__%s__" % x parameters = list(conditions.columns) + list(kwargs.keys()) zyx = [(key, value[0], value[1]) for key, value in derivatives.items()] props = {} combs = {} for x in set(x for z, y, x in zyx): props[dec(x)] = derivative(potential, x, parameters) for z, y, x in zyx: if y == 'f': combs[z] = dec(x) else: dydx = derivative(properties[y], x, parameters) props[dec(z)] = "%s - (%s)*(%s)" % (dydx, props[dec(x)], properties[y]) combs[z] = "%s + (%s)*(%s)" % (dec(z), dec(x), y) unwanted = sum([[x, "d"+x] for x in props.keys()], []) return self.reweighting(potential, dict(properties, **props), {}, dict(combs, **combinations), conditions, reference, **kwargs).drop(unwanted, axis=1)
# ====================================================================================== def pmf(self, potential, property, bins=10, interval=None, **kwargs): self.verbose and info("PMF requested - %s case:" % self.method, self.title) self.verbose and info("Reduced potential:", potential) u = self.compute(potential, kwargs) z = self.compute(property, kwargs) if interval: (zmin, zmax) = interval else: zmin = min(np.amin(x[0, :]) for x in z) zmax = max(np.amax(x[0, :]) for x in z) delta = (zmax - zmin)/bins ibin = [np.floor((x[0:1, :] - zmin)/delta).astype(int) for x in z] results = list() for i in range(bins): zc = zmin + delta*(i + 0.5) self.verbose and info("Bin[%d]:" % (i + 1), "%s = %s" % (property, str(zc))) y = [np.equal(x, i).astype(np.float) for x in ibin] yu, Theta = self.__reweight__(u, y) if yu[1] > 0.0: dyu = np.sqrt(Theta[1, 1]) print([zc, -np.log(yu[1]), dyu/yu[1]]) results.append([zc, -np.log(yu[1]), dyu/yu[1]]) return pd.DataFrame(results, columns=[property, "pmf", "d_pmf"]) # ====================================================================================== def histograms(self, property="u0", bins=100, **kwargs): if property == "u0": y = self.u0 elif property == "state": w = np.arange(self.m) + 1 wsum = sum(w) y = [wsum*np.average(p, axis=0, weights=w) for p in self.P] elif property == "potential": y = [self.u[i][i, :] for i in range(self.m)] else: y = self.compute(property, kwargs) ymin = min([np.amin(x) for x in y]) ymax = max([np.amax(x) for x in y]) delta = (ymax - ymin)/bins center = [ymin + delta*(i + 0.5) for i in range(bins)] frame = pd.DataFrame({property: center}) for (i, s) in enumerate(self.states): frame[s] = np.histogram(y[i], bins, (ymin, ymax))[0] return frame