Source code for jetset.mcmc_ultranest

"""UltraNest backend built on top of the JetSeT MCMC sampler interface."""

__author__ = "Andrea Tramacere"

import os
import time
import shutil
import subprocess
from pathlib import Path
import numpy as np

try:
    from ultranest import ReactiveNestedSampler
    from ultranest.stepsampler import SliceSampler
    from ultranest.stepsampler import generate_mixture_random_direction
    ultranest_installed = True
except Exception:
    on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
    if on_rtd:
        ReactiveNestedSampler = object
        ultranest_installed = False
    else:
        ultranest_installed = False

from .mcmc import McmcSampler, emcee_log_like

__all__ = ['UltraNestSampler', 'run_open_mpi']


[docs] class UltraNestSampler(McmcSampler): """UltraNest backend that reuses most of ``McmcSampler`` post-processing. Notes ----- The class keeps the parameter/bounds workflow from :class:`~jetset.mcmc.McmcSampler` and swaps the sampling engine with ``ultranest.ReactiveNestedSampler``. Posterior samples are stored in ``self.samples`` with the same shape conventions used by ``McmcSampler`` so existing corner/model plotting helpers continue to work. """ def __init__(self, model_minimizer, build_mcmc_parameters=True): """Create a new `UltraNestSampler` instance. Parameters ---------- model_minimizer : object Initialized model-minimizer object. build_mcmc_parameters : bool, optional If ``True``, initialize JetSeT MCMC-style parameter metadata. """ if ultranest_installed is not True: raise ImportError( 'IMPORTANT: UltraNest support requires ultranest>=4.0.\n' 'IMPORTANT: h5py is also required.\n' 'Install with pip: pip install "ultranest>=4.0" h5py\n' 'Install with conda: conda install -c conda-forge "ultranest>=4.0" h5py\n' 'More info: https://johannesbuchner.github.io/UltraNest/' ) super(UltraNestSampler, self).__init__( model_minimizer=model_minimizer, build_mcmc_parameters=build_mcmc_parameters, ) self.result = None self.log_evidence = None self.log_evidence_err = None self.posterior_weighted_points = None self.posterior_weights = None self.posterior_weighted_logl = None self._ultranest_param_names = None def _check_ultranest_bounds(self): missing = [] invalid = [] for par in self._par_array_sampler: low = par.mcmc_bound_min high = par.mcmc_bound_max if low is None or high is None: missing.append(f'{par.model.name}.{par.name}') continue try: low = float(low) high = float(high) if not np.isfinite(low) or not np.isfinite(high) or low >= high: invalid.append(f'{par.model.name}.{par.name}: [{low}, {high}]') except Exception: invalid.append(f'{par.model.name}.{par.name}: [{low}, {high}]') if missing or invalid: err_msg = [ 'UltraNest requires finite [min, max] bounds for all sampling parameters.' ] if missing: err_msg.append('Missing bounds: ' + ', '.join(missing)) if invalid: err_msg.append('Invalid bounds: ' + ', '.join(invalid)) raise RuntimeError('\n'.join(err_msg)) self._build_sampler_bounds() def _build_ultranest_parameter_names(self): names = [] used = set() for idx, par in enumerate(self._par_array_sampler): name = par.name if name in used: name = f'{par.name}_{par.model.name}' if name in used: name = f'{name}_{idx}' names.append(name) used.add(name) return names def _prior_transform(self, cube): cube = np.asarray(cube, dtype=float) theta = np.zeros(self.ndim, dtype=float) for idx, bounds in enumerate(self._bounds_sampler): low, high = bounds theta[idx] = low + cube[idx] * (high - low) return theta def _extract_posterior_samples(self, posterior_samples_size=None, rnd_seed=0): result = self.result if isinstance(self.result, dict) else {} weighted = result.get('weighted_samples', {}) points = None weights = None logl = None if isinstance(weighted, dict): points = np.asarray(weighted.get('points', [])) weights = np.asarray(weighted.get('weights', [])) logl = np.asarray(weighted.get('logl', [])) if points is not None and points.ndim == 2 and points.shape[1] == self.ndim: self.posterior_weighted_points = points if weights is not None and weights.ndim == 1 and weights.shape[0] == points.shape[0]: weights = np.asarray(weights, dtype=float) if np.any(~np.isfinite(weights)) or np.sum(weights) <= 0: weights = np.ones(points.shape[0], dtype=float) / float(points.shape[0]) else: weights = weights / np.sum(weights) self.posterior_weights = weights if logl is not None and logl.ndim == 1 and logl.shape[0] == points.shape[0]: self.posterior_weighted_logl = logl else: self.posterior_weighted_logl = None if posterior_samples_size is None: posterior_samples_size = points.shape[0] posterior_samples_size = max(1, int(posterior_samples_size)) rng = np.random.default_rng(rnd_seed) draw_idx = rng.choice( np.arange(points.shape[0]), size=posterior_samples_size, replace=True, p=weights, ) samples = points[draw_idx] if self.posterior_weighted_logl is not None: samples_logl = self.posterior_weighted_logl[draw_idx] else: samples_logl = np.full(samples.shape[0], np.nan) return samples, samples_logl if logl is not None and logl.ndim == 1 and logl.shape[0] == points.shape[0]: return points, logl return points, np.full(points.shape[0], np.nan) samples = np.asarray(result.get('samples', [])) if samples.ndim != 2 or samples.shape[1] != self.ndim: raise RuntimeError( 'unable to build posterior samples from UltraNest output; ' 'please check result content' ) return samples, np.full(samples.shape[0], np.nan)
[docs] def run_sampler( self, min_num_live_points=400, dlogz=0.5, min_ess=None, frac_remain=0.01, max_ncalls=None, use_UL=False, loglog=False, resume='subfolder', ultranest_output_dir=None, show_status=True, posterior_samples_size=None, rnd_seed=0, use_stepsampler=True, nsteps=2, **run_kwargs, ): """Run posterior sampling with UltraNest. Parameters ---------- min_num_live_points : int, optional Minimum number of live points. dlogz : float, optional Stopping criterion on remaining log-evidence. min_ess : int, optional Minimum effective sample size target. frac_remain : float, optional Fractional prior volume stopping criterion. max_ncalls : int, optional Maximum number of likelihood calls. use_UL : bool, optional If ``True``, include upper-limit terms in the likelihood. loglog : bool, optional If ``True``, evaluate model and data in log10 space. resume : str, optional UltraNest resume mode. ultranest_output_dir : str, optional This is where UltraNest writes its run files. If ``None`` ultranest_output_dir = f'ultranest_{self.model.name}' show_status : bool, optional If ``True``, show UltraNest progress/status. posterior_samples_size : int, optional Number of equal-weight posterior samples to draw from weighted nested-sampling samples. If ``None``, use all weighted points. rnd_seed : int, optional Seed for posterior resampling. use_stepsampler : bool, optional If ``True``, attach a ``SliceSampler`` to the UltraNest sampler. nsteps : int, optional Slice-sampler steps per parameter dimension. The effective value is ``nsteps * self.ndim``. Default is ``2``. **run_kwargs : dict Additional keyword arguments passed to ``ReactiveNestedSampler.run``. Notes ----- On completion, posterior samples are stored in ``self.samples`` and model parameters are reset to the mcmc best-fit point. """ self._check_ultranest_bounds() self.calls = 0 self.calls_OK = 0 self.use_UL = use_UL self.ndim = len(self._par_array_sampler) if ultranest_output_dir is None: ultranest_output_dir = f'ultranest_{self.model.name}' calls_counter = {'count': 0} def _loglike(theta): calls_counter['count'] += 1 return emcee_log_like( theta, self.model, self.data, use_UL, self._par_array_sampler, loglog, ) self._ultranest_param_names = self._build_ultranest_parameter_names() self.sampler = ReactiveNestedSampler( self._ultranest_param_names, _loglike, self._prior_transform, log_dir=ultranest_output_dir, resume=resume, ) if use_stepsampler: self.sampler.stepsampler = SliceSampler(nsteps=nsteps *self.ndim, generate_direction=generate_mixture_random_direction) run_args = { 'min_num_live_points': min_num_live_points, 'dlogz': dlogz, 'frac_remain': frac_remain, 'show_status': show_status, } if min_ess is not None: run_args['min_ess'] = min_ess if max_ncalls is not None: run_args['max_ncalls'] = max_ncalls run_args.update(run_kwargs) print('ultranest run starting') print('') start = time.time() self.result = self.sampler.run(**run_args) end = time.time() comp_time = end - start print('ultranest run done, took %2.2f seconds' % comp_time) self.samples, self.samples_log_prob = self._extract_posterior_samples( posterior_samples_size=posterior_samples_size, rnd_seed=rnd_seed, ) self.chain = self.samples[np.newaxis, :, :] self.log_prob_chain = self.samples_log_prob[np.newaxis, :] self.burnin = 0 if isinstance(self.result, dict): self.log_evidence = self.result.get('logz') self.log_evidence_err = self.result.get('logzerr') self.calls = calls_counter['count'] self.calls_OK = self.calls self.calls_tot = self.calls if max_ncalls is None else max_ncalls self.acceptance_fraction = np.nan self.reset_to_mcmc_best_fit()
[docs] def reset_to_mcmc_best_fit(self, verbose=True): """Reset model parameters to the UltraNest best-fit point.""" theta = None if isinstance(self.result, dict): ml = self.result.get('maximum_likelihood', {}) if isinstance(ml, dict): theta = ml.get('point') if theta is None: if ( hasattr(self, 'samples_log_prob') and self.samples_log_prob is not None and np.any(np.isfinite(self.samples_log_prob)) ): theta = self.samples[np.nanargmax(self.samples_log_prob)] else: theta = np.median(self.samples, axis=0) if theta is None: raise RuntimeError('unable to determine best-fit point from UltraNest output') theta = np.asarray(theta, dtype=float) if theta.shape[0] != len(self._par_array_sampler): raise RuntimeError( 'best-fit point dimensionality mismatch: ' f'{theta.shape[0]} != {len(self._par_array_sampler)}' ) if verbose: print('----------------------------') print('UltraNest best fit solution') for idx, par in enumerate(self._par_array_sampler): par.val = theta[idx] self.model.parameters.set_par( model_name=par.model.name, par_name=par.name, val=par.val, ) par.best_fit_mcmc_val = par.val quantiles = self.get_par_quantiles( par_name=par.name, comp_name=par.model.name, quantiles=(0.16, 0.5, 0.84), ) par.q_16 = quantiles[0] par.q_50 = quantiles[1] par.q_84 = quantiles[2] if verbose: print( 'comp: %s par: %s ultranest best fit val: %s ' 'quantiles(0.16,0.5,0.84): %s' % (par.model.name, par.name, par.val, quantiles) ) if verbose: if self.log_evidence is not None: print( 'logZ=%s logZerr=%s' % (str(self.log_evidence), str(self.log_evidence_err)) ) print('----------------------------')
@property def logz(self): """Return the estimated Bayesian log-evidence.""" return self.log_evidence @property def logzerr(self): """Return the uncertainty on the Bayesian log-evidence.""" return self.log_evidence_err
[docs] def run_open_mpi(sampler, n_proc=8, ultranest_output_dir=None, min_num_live_points=64, nsteps=1, max_num_improvement_loops=1, min_ess=100, num_c_threads=1, extra_mpirun_args=''): """Run ``UltraNestSampler.run_sampler`` through ``mpirun``. Parameters ---------- sampler : UltraNestSampler Configured sampler instance to serialize and execute. n_proc : int, optional Number of MPI ranks passed to ``mpirun -np``. Default is ``8``. ultranest_output_dir : str, optional Name passed to ``run_sampler``, and resolved within run_mpi/ directory min_num_live_points : int, optional Minimum live points forwarded to ``run_sampler``. nsteps : int, optional Slice-sampler step multiplier forwarded to ``run_sampler``. max_num_improvement_loops : int, optional Value forwarded to ``run_sampler``. min_ess : int, optional Minimum effective sample size target forwarded to ``run_sampler``. num_c_threads : int, optional Thread count forwarded to ``mcmc.model.set_num_c_threads`` in the generated MPI worker script. extra_mpirun_args : str, optional Extra command-line arguments inserted after ``mpirun`` and before ``-np`` (for example hostfile or binding flags). Returns ------- UltraNestSampler Sampler reloaded from ``run_mpi/sampler.pkl`` after MPI completion. Notes ----- The function deletes and recreates ``./run_mpi`` in the current working directory, writes a helper Python script there, and temporarily sets ``OMP_NUM_THREADS`` to ``n_proc`` for the parent process while the MPI command runs. """ CURRENT_DIR = Path.cwd().resolve() run_dir = CURRENT_DIR / 'run_mpi' script_name = 'run_jetset_ultranest.py' OMP_NUM_THREADS_INITIAL=os.getenv("OMP_NUM_THREADS") os.environ["OMP_NUM_THREADS"] = "%s"%str(int(n_proc)) command = f"mpirun {extra_mpirun_args} -np {n_proc} python {script_name}" if ultranest_output_dir is None: ultranest_output_dir = f'ultranest_{sampler.model.name}' try: if run_dir.exists(): shutil.rmtree(run_dir) run_dir.mkdir(parents=True, exist_ok=True) sampler_path = run_dir / 'sampler.pkl' sampler.save(str(sampler_path)) #(run_dir / out_dir).mkdir(parents=True, exist_ok=True) sampler_run = f""" mcmc.model.set_num_c_threads({int(num_c_threads)}) mcmc.run_sampler( min_num_live_points={int(min_num_live_points)}, nsteps={int(nsteps)}, max_num_improvement_loops={int(max_num_improvement_loops)}, min_ess={float(min_ess)}, ultranest_output_dir={repr(str(ultranest_output_dir))}, ) """ script = "from jetset.mcmc_ultranest import UltraNestSampler\n" script += "mcmc=UltraNestSampler.load('sampler.pkl')\n" script += f"{sampler_run}\n" script += "mcmc.save('sampler.pkl')\n" print("====== ultranest script ========") print(script) script_path = run_dir / script_name with open(script_path, 'w') as f: f.write(script) print("================================") print("executing", command) subprocess.run(command, shell=True, check=True, cwd=run_dir) mcmc= UltraNestSampler.load(str(sampler_path)) except Exception as e: raise RuntimeError(e) finally: if OMP_NUM_THREADS_INITIAL is not None: os.environ["OMP_NUM_THREADS"] = "%s"%str(int(OMP_NUM_THREADS_INITIAL)) else: os.unsetenv("OMP_NUM_THREADS") return mcmc