Source code for jetset.minimizer

"""Model fitting and minimization utilities for JetSeT workflows."""


__author__ = "Andrea Tramacere"

from itertools import cycle


import scipy as sp

import numpy as np

import warnings

from scipy.stats import chi2

import dill as pickle

try:
    import iminuit
    from iminuit import Minuit
    minuit_installed=True
except Exception as e:
    print('not able to import iminuit:',e)
    minuit_installed=False


from tqdm.auto import tqdm


from .plot_sedfit import plt,heatmap,annotate_heatmap

from scipy.optimize import least_squares
from scipy import interpolate

from .output import section_separator,WorkPlace,makedir
from .utils import JetkerneltException

from .data_loader import ObsData,Data
from .data_loader import lin_to_log







__all__ = ['FitResults','fit_SED','Minimizer','LSBMinimizerScipy', 'MinuitMinimizer', 'ModelMinimizer']



[docs] class FitResults(object): """Container for fit statistics and parameter-report state. Notes ----- Instances are produced after minimization and collect convergence information, chi-square metrics (optionally including UL-free variants), and parameter tables used by reporting and serialization helpers. """ def __init__(self, name, mm, parameters, calls, mesg, success, chisq, dof, chisq_red, null_hyp_sig, wd, chisq_no_UL=None, dof_no_UL=None, chisq_red_no_UL=None, null_hyp_sig_no_UL=None): """Create a new `FitResults` instance. Parameters ---------- name : str Name identifier. mm : object Owning minimizer manager instance. parameters : object Parameter container associated with the fit/model. calls : object Number of function evaluations. mesg : object Backend optimizer status message/object. success : object Boolean convergence flag. chisq : object Chi-square value of the fit. dof : object Degrees of freedom. chisq_red : object Reduced chi-square. null_hyp_sig : object Null-hypothesis significance. wd : object Working directory path for outputs. chisq_no_UL : object, optional Chi-square computed excluding upper-limit contribution. dof_no_UL : object, optional Degrees of freedom excluding upper-limit points. chisq_red_no_UL : object, optional Reduced chi-square excluding upper-limit points. null_hyp_sig_no_UL : object, optional Null-hypothesis significance excluding upper-limit points. """ self.name=name self.parameters=parameters self.mm=mm self.calls=calls self.mesg=mesg self.success=success self.chisq=chisq self.dof=dof self.chisq_red=chisq_red self.null_hyp_sig=null_hyp_sig self.chisq_no_UL = chisq_no_UL self.dof_no_UL = dof_no_UL self.chisq_red_no_UL = chisq_red_no_UL self.null_hyp_sig_no_UL = null_hyp_sig_no_UL self.update_report() self.wd=wd
[docs] def update_report(self): # self.model_table=self.parameters._par_table """Refresh cached report tables from current parameter state. Notes ----- This rebuilds both full and best-fit parameter tables. """ self.parameters._build_best_fit_par_table() self.parameters._build_par_table()
def _show_report(self,): self.update_report() print('-------------------------------------------------------------------------') print("Fit report") print("") print("Model: %s"%self.name) self.parameters.show_pars() print("") print("converged=%s"%self.success) print("calls=%d"%self.calls) print("mesg=") if hasattr(self,'mesg'): try: from IPython.display import display display(self.mesg) except: print(self.mesg) print("dof=%d"%self.dof) print("chisq=%f, chisq/red=%f null hypothesis sig=%f"%(self.chisq,self.chisq_red,self.null_hyp_sig)) if self.dof_no_UL is not None: print("") print("stats without the UL") print("dof UL=%d" % self.dof_no_UL) print( "chisq=%f, chisq/red=%f null hypothesis sig=%f" % (self.chisq_no_UL, self.chisq_red_no_UL, self.null_hyp_sig_no_UL)) print("") print("") print("best fit pars") self.parameters.show_best_fit_pars() print('-------------------------------------------------------------------------') print("") @property def bestfit_table(self): """Bestfit table. Returns ------- object Requested value. """ return self.parameters.best_fit_par_table def _update_asymm_errors(self): if hasattr(self.mm): for pi in range(len(self.mm.fit_par_free)): if hasattr(self.mm.minimizer,'asymm_errors'): self.mm.fit_par_free[pi].err_p=self.mm.minimizer.asymm_errors[pi][0] self.mm.fit_par_free[pi].err_m=self.mm.minimizer.asymm_errors[pi][1]
[docs] def show_report(self): """Print the formatted fit report to stdout. Notes ----- Falls back to a plain warning if rich formatting fails. """ try: self._show_report() except: print('problem in formatting text for report')
#except Exception as e: # raise(RuntimeWarning,'problem in formatting text for report',e)
[docs] @classmethod def load_report(cls,file_name): """Load object state from disk. Parameters ---------- file_name : object Input/output file path. Returns ------- object Loaded object. """ c = pickle.load(open(file_name, "rb")) return c
[docs] def save_report(self,name=None): """Save object state to disk. Parameters ---------- name : object, optional Name identifier. """ if name is None: name = 'best_fit_report.pkl' mm =self.mm self.mm=None setattr(self, 'mm', mm) try: pickle.dump(self, open(name, 'wb'), protocol=pickle.HIGHEST_PROTOCOL) except: o = FitResults(name=self.name, mm=None, parameters=self.parameters, calls=self.calls, mesg=None, success=self.success, chisq=self.chisq, dof=self.dof, chisq_red=self.chisq_red, null_hyp_sig=self.null_hyp_sig, wd=self.wd, chisq_no_UL=self.chisq_no_UL, dof_no_UL=self.dof_no_UL, chisq_red_no_UL=self.chisq_red_no_UL, null_hyp_sig_no_UL=self.null_hyp_sig_no_UL) pickle.dump(o, open(name, 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
[docs] class ModelMinimizer(object): """High-level entry point for model fitting workflows. Notes ----- Handles data preparation, fit-range filtering, backend minimizer selection (SciPy, Minuit, or Sherpa), and packaging of best-fit results. """ def __init__(self,minimizer_type): """Create a new `ModelMinimizer` instance. Parameters ---------- minimizer_type : object Backend minimizer identifier. """ __accepted__ = ['lsb', 'minuit', 'sherpa'] if minimizer_type=='lsb': self.minimizer=LSBMinimizerScipy(self) elif minimizer_type=='minuit': self.minimizer=MinuitMinimizer(self) elif minimizer_type == 'sherpa': from .sherpa_plugin import SherpaMinimizer self.minimizer = SherpaMinimizer(self) elif minimizer_type not in __accepted__: raise RuntimeError('minimizer ', minimizer_type, 'not accepted, please choose among', __accepted__) else: raise RuntimeError('minimizer factory failed') self.minimizer_type=minimizer_type #print('minimizer',minimizer_type)
[docs] def save_model(self, file_name): """Save object state to disk. Parameters ---------- file_name : object Input/output file path. """ _m= self.minimizer self.minimizer=self.minimizer_type pickle.dump(self, open(file_name, 'wb'), protocol=pickle.HIGHEST_PROTOCOL) self.minimizer =_m
[docs] @classmethod def load_model(cls, file_name): """Load object state from disk. Parameters ---------- file_name : object Input/output file path. Returns ------- object Loaded object. """ try: c = pickle.load(open(file_name, "rb")) if isinstance(c, ModelMinimizer): c.__init__(c.minimizer) if hasattr(c,'fit_model'): c.fit_model=c.fit_model._build_model(c.fit_model) return c else: raise RuntimeError('The model you loaded is not valid please check the file name') except Exception as e: raise RuntimeError(e)
def _handle_sed_data(self,sed_data,loglog=False,use_fake_err=False): if sed_data.data['dnuFnu_data'] is None: sed_data.data['dnuFnu_data'] = np.ones(sed_data.data['nu_data'].size) if loglog is False: nu_fit = sed_data.data['nu_data'] err_nu_fit = sed_data.data['dnu_data'] nuFnu_fit = sed_data.data['nuFnu_data'] if use_fake_err == False: err_nuFnu_fit = sed_data.data['dnuFnu_data'] else: err_nuFnu_fit = sed_data.data['dnuFnu_fake'] else: nu_fit = sed_data.data['nu_data_log'] err_nu_fit = sed_data.data['dnu_data_log'] nuFnu_fit = sed_data.data['nuFnu_data_log'] err_nuFnu_fit = sed_data.data['dnuFnu_data_log'] if use_fake_err == False: err_nuFnu_fit = sed_data.data['dnuFnu_data_log'] else: err_nuFnu_fit = sed_data.data['dnuFnu_fake_log'] UL = sed_data.data['UL'] data = np.zeros(nu_fit.size, dtype=[('x', nu_fit.dtype), ('dx',nu_fit.dtype), ('y', nuFnu_fit.dtype), ('dy', err_nuFnu_fit.dtype), ('UL', UL.dtype)]) data['x'] = nu_fit data['dx']= err_nu_fit data['y'] = nuFnu_fit data['dy'] = err_nuFnu_fit data['UL'] = UL return data def _handle_data_xy(self, data, loglog=False, use_fake_err=False): #if data['dy'] is None: # data['dy'] = np.ones(data['x'].size) if loglog is False: x = data.table['x'] dx= data.table['dx'] y = data.table['y'] if use_fake_err == False: dy = data.table['dy'] else: dy = data.table['dy_fake'] else: x,dx = lin_to_log(val=data.table['x'], err=data.table['dx']) y, dy = lin_to_log(val=data.table['y'], err=data.table['dy']) if use_fake_err == False: pass else: y, dy = lin_to_log(val=data.table['y'], err=data.table['dy_fake']) UL = data.table['UL'] _data = np.zeros(x.size, dtype=[('x', x.dtype), ('y', y.dtype), ('dy', dy.dtype), ('UL', UL.dtype)]) _data['x'] = x _data['dx'] = dx _data['y'] = y _data['dy'] = dy _data['UL'] = UL return _data def _prepare_fit(self, fit_model, data, nu_fit_start, nu_fit_stop, fitname=None, fit_workplace=None, loglog=False, silent=False, get_conf_int=False, use_fake_err=False, use_UL=False): #print('--> DEBUG WorkPlace.flag',WorkPlace.flag,fit_workplace) if fitname is None: fitname = fit_model.name #print('--> DEBUG ') if fit_workplace is None: fit_workplace = WorkPlace() out_dir = fit_workplace.out_dir + '/' + fitname + '/' else: out_dir = fit_workplace.out_dir + '/' + fitname + '/' makedir(out_dir) for model in fit_model.components.components_list: if model.model_type == 'jet': model.set_path(out_dir) if isinstance(data,ObsData): self.data=self._handle_sed_data(data,use_fake_err=use_fake_err,loglog=loglog) elif isinstance(data,Data): self.data= self._handle_data_xy(data,use_fake_err=use_fake_err,loglog=loglog) else: raise RuntimeError('data type must be Obs Data or Data') #if sed_data.data['dnuFnu_data'] is None: # sed_data.data['dnuFnu_data'] = np.ones(sed_data.data['nu_data'].size) # filter data points if loglog is True: nu_fit_start = np.log10(nu_fit_start) nu_fit_stop = np.log10(nu_fit_stop) msk1 = self.data['x'] > nu_fit_start msk2 = self.data['x'] < nu_fit_stop msk_zero_error = self.data['dy'] > 0.0 self.use_UL=use_UL if use_UL == True: msk = msk1 * msk2 * msk_zero_error else: msk = msk1 * msk2 * msk_zero_error*~self.data['UL'] self.data = self.data[msk] if silent == False: print("filtering data in fit range = [%e,%e]" % (nu_fit_start, nu_fit_stop)) print("data length", self.data.size) for par in fit_model.parameters.par_array: if get_conf_int == True: par.set_fit_initial_value(par.best_fit_val) else: par.set_fit_initial_value(par.val) fit_par_free = [par for par in fit_model.parameters.par_array if par.frozen is False and (par.linked is False ) and par._is_dependent is False] #remove duplicates and keeps the order fit_par_free = sorted(set(fit_par_free), key=fit_par_free.index) pinit = [par.get_fit_initial_value() for par in fit_par_free] # bounds free_pars = 0 for pi in range(len(fit_model.parameters.par_array)): if fit_model.parameters.par_array[pi].frozen == False: free_pars += 1 if silent == False: print(section_separator) print("*** start fit process ***") #print("initial pars: ") #fit_model.parameters.show_pars() print("----- ") self.out_dir=out_dir self.pinit=pinit self.pout=None self.free_pars=free_pars self.fit_par_free=fit_par_free self.fit_model=fit_model self.loglog=loglog if hasattr(self.fit_model,'nu_min_fit'): self.fit_model.nu_min_fit = nu_fit_start self.fit_model.nu_max_fit = nu_fit_stop if len(fit_par_free)>self.data['x'].size and isinstance(self.minimizer,LSBMinimizerScipy): m='number of data points: %d is lower than number of free pars: %d'%(self.data['x'], len(fit_par_free)) raise JetkerneltException(message=m)
[docs] def fit(self, fit_model, sed_data, nu_fit_start, nu_fit_stop, fitname=None, fit_workplace=None, loglog=False, silent=False, get_conf_int=False, max_ev=0, use_fake_err=False, use_UL=False, skip_minimizer=False, repeat=1): """Fit. Parameters ---------- fit_model : object Model instance used for fitting. sed_data : object Observational SED data container. nu_fit_start : float Lower bound of the fit range in Hz. nu_fit_stop : float Upper bound of the fit range in Hz. fitname : object, optional Name assigned to the fit run. fit_workplace : path, optional Workplace/output configuration object. loglog : bool, optional If ``True``, operate in log10 space. silent : bool, optional If ``True``, suppress informational output. get_conf_int : bool, optional If ``True``, use confidence-interval initialization strategy. max_ev : int, optional Maximum number of optimizer function evaluations. use_fake_err : bool, optional If ``True``, enable fake err. use_UL : bool, optional If ``True``, enable ul. skip_minimizer : bool, optional If ``True``, skip minimizer. repeat : int, optional Number of repeated minimization passes. Returns ------- object Computed value. """ self.silent=silent self._prepare_fit( fit_model, sed_data, nu_fit_start, nu_fit_stop, fitname=fitname, fit_workplace=fit_workplace, loglog=loglog, silent=silent, get_conf_int=get_conf_int, use_fake_err=use_fake_err,use_UL=use_UL) fit_model.set_nu_grid(nu_min=nu_fit_start*0.5, nu_max=nu_fit_stop*1.5) self.corr=[] self.covar=[] for i in range(repeat): if skip_minimizer == False: if repeat>1: if silent is False: print('fit run:',i) if i>0: self.pinit = [v for v in self.minimizer.pout] if silent is False: print('- old chisq=%5.5e' % (self.minimizer.chisq)) #old_chisq = self.minimizer.chisq self.minimizer.fit(self,max_ev=max_ev,silent=silent,use_UL=use_UL) self.pout = self.minimizer.pout self.errors = self.minimizer.errors if hasattr(self.minimizer, 'asymm_errors'): self.asymm_errors = self.minimizer.asymm_errors self.covar.append(self.minimizer.covar) self.corr.append(self.minimizer.corr) self.reset_to_best_fit() self.minimizer._fit_stats() if silent is False: print('- best chisq=%5.5e'%(self.minimizer.chisq)) print() new_chisq = self.minimizer.chisq if i==0: old_chisq = new_chisq else: pass return self.get_fit_results(fit_model,nu_fit_start,nu_fit_stop,fitname,loglog=loglog,silent=silent)
[docs] def get_fit_results(self, fit_model, nu_fit_start, nu_fit_stop, fitname, silent=False, loglog=False): """Return fit results. Parameters ---------- fit_model : object Model instance used for fitting. nu_fit_start : float Lower bound of the fit range in Hz. nu_fit_stop : nu_fit_start : floa Upper bound of the fit range in Hz. fitname : str Name assigned to the fit run. silent : bool, optional If ``True``, suppress informational output. loglog : bool, optional If ``True``, operate in log10 space. Returns ------- object Requested value. """ self.reset_to_best_fit() self.minimizer._fit_stats() best_fit = FitResults(fitname, self, fit_model.parameters, self.minimizer.calls, self.minimizer.mesg, self.minimizer.success, self.minimizer.chisq, self.minimizer.dof, self.minimizer.chisq_red, self.minimizer.null_hyp_sig, self.out_dir, chisq_no_UL=self.minimizer.chisq_no_UL, dof_no_UL=self.minimizer.dof_no_UL, chisq_red_no_UL=self.minimizer.chisq_red_no_UL, null_hyp_sig_no_UL=self.minimizer.null_hyp_sig_no_UL) if silent == False: best_fit.show_report() fit_model.set_nu_grid(nu_min=nu_fit_start , nu_max=nu_fit_stop) fit_model.eval(fill_SED=True, loglog=loglog, phys_output=True) res_bestfit = self.minimizer.residuals_Fit(self.minimizer.pout, self.fit_par_free, self.data, self.fit_model, self.loglog, use_UL=self.use_UL) if loglog == True: fit_model.SED.fill(nu_residuals=np.power(10, self.data['x']), residuals=res_bestfit) else: fit_model.SED.fill(nu_residuals=self.data['x'], residuals=res_bestfit) if silent == False: print(section_separator) fit_model.eval(fill_SED=True) if self.minimizer._post_fit_warnings!='': print('there are fit warnings messages, use the .show_fit_warnings() or access the member .minimizer._post_fit_warnings') return best_fit
[docs] def show_fit_warnings(self): """Print warnings collected during fitting. Notes ----- Warnings are stored in ``self.minimizer._post_fit_warnings``. """ print(self.minimizer._post_fit_warnings)
[docs] def reset_to_best_fit(self): """Reset model parameters to stored best-fit solution. Notes ----- Also refreshes per-parameter best-fit values and errors. """ for pi in range(len(self.fit_par_free)): self.fit_par_free[pi].set(val=self.pout[pi]) self.fit_par_free[pi].best_fit_val = self.pout[pi] self.fit_par_free[pi].best_fit_err = self.errors[pi] if hasattr(self,'asymm_errors'): self.fit_par_free[pi].err_p=self.asymm_errors[pi][0] self.fit_par_free[pi].err_m=self.asymm_errors[pi][1] self.fit_model.eval()
[docs] def plot_corr_matrix(self): """Plot corr matrix. Returns ------- object Plot object or generated visualization. """ if hasattr(self,'corr'): if len(self.corr)>0: for n_rep,corr in enumerate(self.corr): fig, ax = plt.subplots() corr=self.corr[n_rep] labels=[p.name for p in self.fit_par_free] im, cbar = heatmap(corr, labels, labels, ax=ax, cmap="coolwarm", cbarlabel="corr") texts = annotate_heatmap(im, valfmt="{x:.2f}") ax.set_title(f'fit rep n={n_rep}') fig.tight_layout() return fig
[docs] class Minimizer(object): """Base class for concrete minimizer backends. Notes ----- Provides shared residual and chi-square machinery, progress tracking, and post-fit statistics used by all optimizer implementations. """ def __init__(self,model): """Create a new `Minimizer` instance. Parameters ---------- model : object Model instance. """ self.model=model self._progress_iter = cycle(['|', '/', '-', '\\']) self._post_fit_warnings='' self.pbar=None self.covar=None #self._progess_bar(pbar, _res_sum, _res_sum_UL)
[docs] def fit(self,model, max_ev=None, use_UL=False, use_dx=False, silent=False): """Fit. Parameters ---------- model : object Model instance. max_ev : int, optional Maximum number of optimizer function evaluations. use_UL : bool, optional If ``True``, enable ul. use_dx : bool, optional If ``True``, enable dx. silent : bool, optional If ``True``, suppress informational output. """ if silent is False: self.pbar = tqdm(total=None) self.use_UL = use_UL self.use_dx=use_dx self.calls=0 self.res_check=None self.model=model self.silent=silent self._original_silent_state = silent self._fit(max_ev) self._fit_stats() self._set_fit_errors()
def _force_silent(self): self.silent=True def _restore_silent_state(self): self.silent = self._original_silent_state def _fit_stats(self): self.dof = len(self.model.data['x']) - self.model.free_pars self.chisq = self.get_chisq() self.chisq_red = self.chisq / float(self.dof) self.null_hyp_sig = 1.0 - chi2.cdf(self.chisq, self.dof) if self.use_UL==True: self.dof_no_UL = len(self.model.data['x']) - self.model.free_pars - self.model.data['UL'].sum() self.use_UL=False self.chisq_no_UL = self.get_chisq() self.use_UL = True self.chisq_red_no_UL = self.chisq_no_UL / float(self.dof_no_UL) self.null_hyp_sig_no_UL=1.0 - chi2.cdf(self.chisq_no_UL, self.dof_no_UL) else: self.dof_no_UL = None self.chisq_red_no_UL = None self.chisq_no_UL=None self.null_hyp_sig_no_UL=None self.success = True self.status = 1 def _set_fit_errors(self): if self.covar is None: print("!Warning, no covariance matrix produced") self.errors = np.zeros(len(self.pout)) else: self.errors = [np.sqrt(np.fabs(self.covar[pi, pi]) * self.chisq_red) for pi in range(len(self.model.fit_par_free))] def _progess_bar(self, _res_sum, res_sum_UL): if (np.mod(self.calls, 10) == 0 and self.calls != 0) : m="minim. function calls=%d, chisq=%5.5e UL part=%f" %(self.calls, _res_sum, res_sum_UL) self.pbar.n=self.calls self.pbar.set_description(m) self.pbar.update(1)
[docs] def residuals_Fit(self, p, fit_par, data, best_fit_SEDModel, loglog, chisq=False, use_UL=False, silent=False): #_warn=False """Residuals fit. Parameters ---------- p : target_parameters_array parameter. fit_par : object List of free fit-parameter objects. data : object Input data table or array. best_fit_SEDModel : object Model instance evaluated against data during fitting. loglog : object If ``True``, operate in log10 space. chisq : bool, optional Chi-square value of the fit. use_UL : bool, optional If ``True``, enable ul. silent : bool, optional If ``True``, suppress informational output. Returns ------- object Computed value. """ for pi in range(len(fit_par)): #_old_v=fit_par[pi].val fit_par[pi].set(val=p[pi]) if np.isnan(p[pi]): _warn=True s1 = str('warning nan for par %s' % fit_par[pi].name) s2 = ', old parameter value was %s' % str(fit_par[pi].val) s =s1 + s2 warnings.warn(s) self._post_fit_warnings+=s+'\n' model = best_fit_SEDModel.eval(nu=data['x'], fill_SED=False, get_model=True, loglog=loglog) _res_sum, _res, _res_sum_UL=_eval_res(data['y'], model, data['dy'], data['UL'], use_UL=use_UL) self._res_sum_chekc=_res_sum self._res_chekc = _res self._res_UL_chekc = _res_sum_UL self.calls +=1 if silent==False: #with tqdm(total=None, file=sys.stdout) as pbar: if self.pbar is not None: self._progess_bar(_res_sum, _res_sum_UL) #print("\r", end="") if chisq==True: res=_res_sum else: res=_res return res
[docs] def get_chisq(self): """Return chisq. Returns ------- object Requested value. """ sig2_x_term=None if self.use_dx is True: dx=self.model.data['dx'] #NOTE here we work in logspace to get a smoother derivative lin_nu,log_nu=self.fit_model._prepare_nu_model(log_log=self.model.loglog) log_y = self.model.fit_model.eval(fill_SED=False, get_model=True, loglog=True) log_y_deriv=np.gradient(log_y, log_nu) log_y_deriv_interpolator=interpolate.ininterp1d(log_nu, log_y_deriv, kind='linear',bounds_error=False) log_model_deriv=log_y_deriv_interpolator(self.model.data['x']) if self.model.loglog: log_model_interpolator=interpolate.ininterp1d(log_nu, log_y, kind='linear',bounds_error=False) model = log_model_interpolator(self.model.data['x']) sig2_x_term=(log_model_deriv*dx)**2 else: y_log_interpolator = interpolate.interp1d(np.log10(log_nu), log_y, bounds_error=False, kind='linear') model = np.power(10., y_log_interpolator(np.log10(self.model.data['x']))) #NOTE this is needed to go back to linear derivative y=10**(log_y) lin_y_deriv=(y/lin_nu)*log_y_deriv #NOTE now I log-iterpolate linear derivative, and get 10** model_deriv_log_interpolator=interpolate.ininterp1d(log_nu, np.log10(lin_y_deriv), kind='linear',bounds_error=False) model_deriv=np.power(10., model_deriv_log_interpolator(np.log10(self.model.data['x']))) sig2_x_term=(model_deriv*dx)**2 else: model = self.model.fit_model.eval(nu=self.model.data['x'], fill_SED=False, get_model=True, loglog=self.model.loglog) _res_sum, _res, _res_sum_UL = _eval_res(self.model.data['y'], model, self.model.data['dy'], self.model.data['UL'], sig_x_term=sig2_x_term, use_UL=self.use_UL) return _res_sum
@property def corr(self): """Corr. Returns ------- object Requested value. """ try: if self.covar is not None: v = np.sqrt(np.diag(self.covar)) outer_v = np.outer(v, v) correlation = self.covar / outer_v correlation[self.covar == 0] = 0 else: correlation= None except Exception as e: warnings.warn('correlation matrix computation failed') correlation=None return correlation
def _eval_res(data, model, data_error, UL, use_UL=False,sig_x_term=None): if sig_x_term is not None: res_no_UL = (data[~UL] - model[~UL]) / (data_error[~UL]+np.sqrt(sig_x_term)) else: res_no_UL = (data[~UL] - model[~UL]) / (data_error[~UL]) res = (data - model) / (data_error) res_UL_log = [0] #print('UL.sum() ',UL.sum(),use_UL) if UL.sum() > 0 and use_UL is True: res_UL_log, res_UL = _eval_res_UL(data[UL], model[UL], data_error[UL]) #print('ress_UL', res_UL) res[UL]=res_UL res_sum_UL= -2.0*np.sum(res_UL_log) res_sum=np.sum(res_no_UL * res_no_UL) + res_sum_UL return res_sum,res,res_sum_UL def _eval_res_UL(y_UL,y_model,y_err): y = (y_UL - y_model) / (np.sqrt(2) *y_err) res_ul_chi2 = 0.5 * (1.0 + sp.special.erf(y)) res_ul_chi2[res_ul_chi2 < 1E-300] = 1E-300 res_ul = np.copy(y) res_ul[y > 0] = 0 return np.log(res_ul_chi2), res_ul
[docs] class LSBMinimizerScipy(Minimizer): """SciPy ``least_squares`` backend for JetSeT fits. Notes ----- Uses the trust-region reflective algorithm with parameter bounds and computes an approximate covariance matrix from the returned Jacobian. """ def __init__(self, model): """Create a new `LSBMinimizerScipy` instance. Parameters ---------- model : object Model instance. """ super(LSBMinimizerScipy, self).__init__(model) self.conf_dict=dict( xtol=None, ftol=1E-8, gtol=1E-8, jac='3-point', loss='linear', tr_solver=None, f_scale=1.0) def _fit(self,max_ev): bounds = ([-np.inf if par.fit_range_min is None else par.fit_range_min for par in self.model.fit_par_free], [np.inf if par.fit_range_max is None else par.fit_range_max for par in self.model.fit_par_free]) max_nfev = None if (max_ev == 0 or max_ev is None) else max_ev if self.conf_dict['xtol'] is None and max_nfev is not None: max_nfev=None print('for xtol set to None max_ev condition is disabled') fit = least_squares(self.residuals_Fit, self.model.pinit, args=(self.model.fit_par_free, self.model.data, self.model.fit_model, self.model.loglog, False, True, self.silent), method='trf', bounds=bounds, max_nfev=max_nfev, verbose=0, **self.conf_dict) try: self.covar=np.linalg.inv(2 * np.dot(fit.jac.T, fit.jac)) except Exception as e: warnings.warn('covariance failed') self.covar=None self.chisq=sum(fit.fun * fit.fun) self.mesg = fit.message self.pout = fit.x
[docs] class MinuitMinimizer(Minimizer): """iminuit-based backend for JetSeT fits. Notes ----- Builds a Minuit function from free model parameters, optionally runs a simplex pre-step, then executes ``migrad`` and extracts fit values and covariance information. """ def __init__(self, model, add_simplex=True, conf_dict=dict(tol=0.1)): """Create a new `MinuitMinimizer` instance. Parameters ---------- model : object Model instance. add_simplex : bool, optional If ``True``, run simplex before MIGRAD. conf_dict : object, optional Configuration dictionary for optimizer controls. """ if minuit_installed==True: pass else: raise RuntimeError('iminuit non installed') super(MinuitMinimizer, self).__init__(model) self.conf_dict=conf_dict self.add_simplex=add_simplex def _fit(self,max_ev=None): bounds = [(par.fit_range_min, par.fit_range_max) for par in self.model.fit_par_free] self._set_minuit_func(self.model.pinit, bounds) if max_ev is None or max_ev==0: max_ev =10000 self.minuit_fun.tol=self.conf_dict['tol'] if self.add_simplex is True: #print('====> simplex') _=self.minuit_fun.simplex(ncall=max_ev) #print('====> migrad') self.mesg=self.minuit_fun.simplex(ncall=max_ev).migrad(ncall=max_ev) else: #print('====> migrad') self.mesg=self.minuit_fun.simplex(ncall=max_ev).migrad(ncall=max_ev) if iminuit.__version__ < "2": self.pout=[self.minuit_fun.values[k] for k in self.minuit_fun.values.keys()] self.p=[self.minuit_fun.values[k] for k in self.minuit_fun.values.keys()] else: self.pout=[None]*len(self.minuit_fun.values) self.p = [None] * len(self.minuit_fun.values) for ID,val in enumerate(self.minuit_fun.values): self.pout[ID] = val self.p[ID] = val self.covar=np.array(self.minuit_fun.covariance) if hasattr(self.mesg,'_covariance'): self.mesg._covariance=None def _set_fit_errors(self): if iminuit.__version__ < "2": self.errors = [self.minuit_fun.errors[k] for k in self.minuit_fun.errors] else: self.errors = [None] * len(self.minuit_fun.values) for ID,err in enumerate(self.minuit_fun.errors): self.errors[ID] = err def _set_minuit_func(self, p_init, bounds,p_error=None): if p_error==None: p_error=[0.1]*len(p_init) p_names = ['par_{}'.format(_) for _ in range(len(p_init))] error_names = ['error_par_{}'.format(_) for _ in range(len(p_init))] p_bound_names = ['limit_par_{}'.format(_) for _ in range(len(p_init))] #This dict contains all the kw for #par kwdarg = {} for n, p, bn, b,en,e in zip(p_names, p_init, p_bound_names, bounds,error_names,p_error): kwdarg[n] = p kwdarg[bn] = b kwdarg[en] = e self.minuit_par_name_dict={} self.minuit_bounds_dict={} self.par_dict = {} for ID,par in enumerate(self.model.fit_par_free): self.minuit_par_name_dict[par.name]=p_names[ID] self.minuit_bounds_dict[par.name]=bounds[ID] self.par_dict[par.name]=par if iminuit.__version__ < "2": self.minuit_fun = iminuit.Minuit(fcn=self.chisq_func, name=p_names, pedantic=False, errordef=1, **kwdarg) else: values=tuple(par.val for ID,par in enumerate(self.model.fit_par_free)) self.minuit_fun = iminuit.Minuit(self.chisq_func, values, name=p_names) self.minuit_fun.limits=[ bounds[ID] for ID,par in enumerate(self.model.fit_par_free) ] #print('=> self.minuit_fun.limits', self.minuit_fun.limits) self.minuit_fun.errordef = Minuit.LEAST_SQUARES
[docs] def chisq_func(self, *p): """Chisq func. Parameters ---------- *p : paramters target parameters Returns ------- object Computed value. """ if iminuit.__version__ < "2": self.p = p else: self.p = p[0] return self.residuals_Fit(self.p, self.model.fit_par_free, self.model.data, self.model.fit_model, self.model.loglog, chisq=True, use_UL=self.use_UL, silent=self.silent)
[docs] def minos_errors(self,par=None): """Minos errors. Parameters ---------- par : object, optional Parameter object or parameter name. """ try: if par is not None: par=self.minuit_par_name_dict[par] self.calls = 0 self.minuit_fun.minos(var=par) self.minuit_fun.print_param() self.asymm_errors = [(self.minuit_fun.merrors[(k,1.0)],self.minuit_fun.merrors[(k,-1.0)]) for k in self.minuit_fun.errors.keys()] self.model.reset_to_best_fit() except: raise RuntimeWarning('Fit quality not sufficient to run Minos')
[docs] def profile(self,par,bound=2,subtract_min=True): """Profile. Parameters ---------- par : object Parameter object or parameter name. bound : int, optional Absolute parameter-bound span. subtract_min : bool, optional Minimum value for subtract. Returns ------- object Computed value. """ self._force_silent() try: self.calls = 0 bound=self._set_bounds(par,bound=bound) x, y = self.minuit_fun.profile(self.minuit_par_name_dict[par], bound=bound, subtract_min=subtract_min) self.model.reset_to_best_fit() self._restore_silent_state() return x,y except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run profile')
[docs] def mnprofile(self,par,bound=2,subtract_min=True): """ Parameters ---------- par bound subtract_min Returns ------- """ try: self._force_silent() self.calls = 0 bound = self._set_bounds(par, bound=bound) x, y,r = self.minuit_fun.mnprofile(self.minuit_par_name_dict[par], bound=bound, subtract_min=subtract_min) #print('-->r',r) self.model.reset_to_best_fit() self._restore_silent_state() return x,y,r except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run mnprofile')
[docs] def draw_mnprofile(self,par,bound=2): """ Parameters ---------- par bound Returns ------- """ try: self._force_silent() self.calls = 0 bound = self._set_bounds(par, bound=bound) x,y,r=self.mnprofile(par,bound,subtract_min=True) fig,ax = self._draw_profile(x, y, par) self.model.reset_to_best_fit() self._restore_silent_state() return x, y, fig,ax except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run draw_mnprofile')
[docs] def draw_profile(self,par,bound=2): """Draw profile. Parameters ---------- par : object Parameter object or parameter name. bound : int, optional Absolute parameter-bound span. Returns ------- object Computed value. """ try: self._force_silent() self.calls = 0 bound = self._set_bounds(par, bound=bound) x, y = self.profile(par, subtract_min=True, bound=bound) fig,ax=self._draw_profile(x,y,par) self.model.reset_to_best_fit() self._restore_silent_state() return x,y,fig,ax except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run draw_profile')
def _draw_profile(self,x,y,par): self.calls = 0 x = np.array(x) y = np.array(y) fig, ax = plt.subplots() ax.plot(x, y) ax.grid(True) ax.set_xlabel(par) ax.set_ylabel('FCN') minpos = np.argmin(y) ymin = y[minpos] tmpy = y - ymin # now scan for minpos to the right until greater than one up = self.minuit_fun.errordef righty = np.power(tmpy[minpos:] - up, 2) right_min = np.argmin(righty) rightpos = right_min + minpos lefty = np.power((tmpy[:minpos] - up), 2) left_min = np.argmin(lefty) leftpos = left_min le = x[minpos] - x[leftpos] re = x[rightpos] - x[minpos] ax.axvspan(x[leftpos], x[rightpos], facecolor='g', alpha=0.5) plt.figtext(0.5, 0.5, '%s = %7.3e ( -%7.3e , +%7.3e)' % (par, x[minpos], le, re), ha='center') return fig,ax
[docs] def contour(self,par_1,par_2,bound=2,bins=20,subtract_min=True): """Contour. Parameters ---------- par_1 : object First parameter object/name. par_2 : object Second parameter object/name. bound : int, optional Absolute parameter-bound span. bins : int, optional Number of bins used for histograms/grids. subtract_min : bool, optional Minimum value for subtract. Returns ------- object Computed value. """ try: self.calls = 0 if np.shape(bound)==(): bound_1 = self._set_bounds(par_1,bound) bound_2 = self._set_bounds(par_2,bound) elif np.shape(bound)==(2,) or np.shape(bound)==(2,2): bound_1 = self._set_bounds(par_1, bound[0]) bound_2 = self._set_bounds(par_2, bound[1]) else: raise RuntimeError('bound must be scalar or (2,) or (2,2)') bound = [bound_1, bound_2] self._force_silent() x, y, z= self.minuit_fun.contour(self.minuit_par_name_dict[par_1], self.minuit_par_name_dict[par_2], subtract_min=subtract_min, bound=bound, bins=bins) self.model.reset_to_best_fit() self._restore_silent_state() return x,y,z except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run contour')
[docs] def draw_contour(self,par_1,par_2,bound=2,levels=np.arange(5)): """Draw contour. Parameters ---------- par_1 : object First parameter object/name. par_2 : object Second parameter object/name. bound : int, optional Absolute parameter-bound span. levels : object, optional Contour levels for corner plots. Returns ------- object Computed value. """ try: self.calls = 0 l=self.minuit_fun.errordef*levels x,y,z=self.contour(par_1,par_2,bound=bound) fig, ax = plt.subplots() CS = ax.contour(x, y, z, levels,) ax.clabel(CS, fontsize=9, inline=1) ax.set_xlabel(par_1) ax.set_ylabel(par_2) self.model.reset_to_best_fit() return x,y,z,fig,ax except: raise RuntimeWarning('Fit quality not sufficient to run draw_contour')
[docs] def mncontour(self,par_1,par_2,bound=2,bins=20,subtract_min=True): """Mncontour. Parameters ---------- par_1 : object First parameter object/name. par_2 : object Second parameter object/name. bound : int, optional Absolute parameter-bound span. bins : int, optional Number of bins used for histograms/grids. subtract_min : bool, optional Minimum value for subtract. Returns ------- object Computed value. """ try: self.calls = 0 if np.shape(bound)==(): bound_1 = self._set_bounds(par_1,bound) bound_2 = self._set_bounds(par_2,bound) elif np.shape(bound)==(2,) or np.shape(bound)==(2,2): bound_1 = self._set_bounds(par_1, bound[0]) bound_2 = self._set_bounds(par_2, bound[1]) else: raise RuntimeError('bound must be scalar or (2,) or (2,2)') bound = [bound_1, bound_2] self._force_silent() x, y, z= self.minuit_fun.mncontour(self.minuit_par_name_dict[par_1], self.minuit_par_name_dict[par_2], subtract_min=subtract_min, bound=bound, bins=bins) self.model.reset_to_best_fit() self._restore_silent_state() return x,y,z except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run mcontour')
[docs] def draw_mncontour(self,par_1,par_2,bound=2,levels=np.arange(5)): """Draw mncontour. Parameters ---------- par_1 : object First parameter object/name. par_2 : object Second parameter object/name. bound : int, optional Absolute parameter-bound span. levels : object, optional Contour levels for corner plots. Returns ------- object Computed value. """ try: self.calls = 0 l=self.minuit_fun.errordef*levels x,y,z=self.mncontour(par_1,par_2,bound=bound) fig, ax = plt.subplots() CS = ax.contour(x, y, z, levels,) ax.clabel(CS, fontsize=9, inline=1) ax.set_xlabel(par_1) ax.set_ylabel(par_2) self.model.reset_to_best_fit() return x,y,z,fig,ax except: raise RuntimeWarning('Fit quality not sufficient to run draw_mcontour')
def _set_bounds(self,par,bound): p = self.par_dict[par] if np.shape(bound)==(): bound = [p.best_fit_val - p.best_fit_err * bound, p.best_fit_val + p.best_fit_err * bound] elif np.shape(bound)==(2,): pass else: raise RuntimeError('bound shape',np.shape(bound),'for par',p.name,'is wrong, has to be scalare or (2,)') #print('bound for par', par, ' set to', bound) bound = self._check_bounds(par, bound) #print('bound for par',par,' updated to',bound) return bound def _check_bounds(self,par,bound): if self.minuit_bounds_dict[par][0] is not None: if bound[0]<self.minuit_bounds_dict[par][0]: bound[0]=self.minuit_bounds_dict[par][0] if self.minuit_bounds_dict[par][1] is not None: if bound[1]>self.minuit_bounds_dict[par][1]: bound[1]=self.minuit_bounds_dict[par][1] return bound
[docs] def fit_SED(fit_model, sed_data, nu_fit_start, nu_fit_stop, fitname=None, fit_workplace=None, loglog=False, silent=False, get_conf_int=False, max_ev=0, use_fake_err=False, minimizer='lsb', use_UL=False,repeat=3): """Fit sed. Parameters ---------- fit_model : object Model instance used for fitting. sed_data : object Observational SED data container. nu_fit_start : float Lower bound of the fit range in Hz. nu_fit_stop : float Upper bound of the fit range in Hz. fitname : str, optional Name assigned to the fit run. fit_workplace : path, optional Workplace/output configuration object. loglog : bool, optional If ``True``, operate in log10 space. silent : bool, optional If ``True``, suppress informational output. get_conf_int : bool, optional If ``True``, use confidence-interval initialization strategy. max_ev : int, optional Maximum number of optimizer function evaluations. use_fake_err : bool, optional If ``True``, enable fake err. minimizer : str, optional Minimizer backend name. use_UL : bool, optional If ``True``, enable ul. repeat : int, optional Number of repeated minimization passes. Returns ------- object Computed value. """ mm = ModelMinimizer(minimizer) return mm,mm.fit(fit_model, sed_data, nu_fit_start, nu_fit_stop, fitname=fitname, fit_workplace=fit_workplace, loglog=loglog, silent=silent, get_conf_int=get_conf_int, max_ev=max_ev, use_fake_err=use_fake_err, use_UL=use_UL, repeat=repeat)
def fit_XY(fit_model, data, x_fit_start,x_fit_stop,fitname=None,silent=False,get_conf_int=False,minimizer='minuit', use_UL=False, repeat=1): """Fit xy. Parameters ---------- fit_model : object Model instance used for fitting. data : object Input data table or array. x_fit_start : float Lower boundary of fit range on x-axis. x_fit_stop : float Upper boundary of fit range on x-axis. fitname : object, optional Name assigned to the fit run. silent : bool, optional If ``True``, suppress informational output. get_conf_int : bool, optional If ``True``, use confidence-interval initialization strategy. minimizer : str, optional Minimizer backend name. use_UL : bool, optional If ``True``, enable ul. repeat : int, optional Number of repeated minimization passes. Returns ------- object Computed value. """ mm = ModelMinimizer(minimizer) return mm, mm.fit(fit_model, data, nu_fit_start=x_fit_start, nu_fit_stop=x_fit_stop, fitname=fitname, fit_workplace=None, loglog=False, silent=silent, get_conf_int=get_conf_int, max_ev=0, use_fake_err=False, use_UL=use_UL, repeat=repeat)