Source code for jetset.minimizer


__author__ = "Andrea Tramacere"

from itertools import cycle


import scipy as sp

import numpy as np

import warnings

from scipy.stats import chi2

import dill as pickle

try:
    import iminuit
    from iminuit import Minuit
    minuit_installed=True
except:
    minuit_installed=False


from tqdm.auto import tqdm


from .plot_sedfit import plt,heatmap,annotate_heatmap

from scipy.optimize import least_squares,minimize

from .leastsqbound.leastsqbound import leastsqbound

from .output import section_separator,WorkPlace,makedir
from .utils import JetkerneltException

from .data_loader import ObsData,Data
from .data_loader import lin_to_log





__all__ = ['FitResults','fit_SED','Minimizer','LSBMinimizerScipy', 'MinuitMinimizer', 'ModelMinimizer']



[docs] class FitResults(object): """ Class to store the fit results Parameters Members :ivar fit_par: fit_par :ivar info: info :ivar mesg: mesg :ivar success: success :ivar chisq: chisq :ivar dof: dof :ivar chisq_red: chisq_red :ivar null_hyp_sig: null_hyp_sig :ivar fit_report: ivar get_report() ------- """ def __init__(self, name, mm, parameters, calls, mesg, success, chisq, dof, chisq_red, null_hyp_sig, wd, chisq_no_UL=None, dof_no_UL=None, chisq_red_no_UL=None, null_hyp_sig_no_UL=None): self.name=name self.parameters=parameters self.mm=mm self.calls=calls self.mesg=mesg self.success=success self.chisq=chisq self.dof=dof self.chisq_red=chisq_red self.null_hyp_sig=null_hyp_sig self.chisq_no_UL = chisq_no_UL self.dof_no_UL = dof_no_UL self.chisq_red_no_UL = chisq_red_no_UL self.null_hyp_sig_no_UL = null_hyp_sig_no_UL self.update_report() self.wd=wd
[docs] def update_report(self): # self.model_table=self.parameters._par_table self.parameters._build_best_fit_par_table() self.parameters._build_par_table()
def _show_report(self,): self.update_report() print('-------------------------------------------------------------------------') print("Fit report") print("") print("Model: %s"%self.name) self.parameters.show_pars() print("") print("converged=%s"%self.success) print("calls=%d"%self.calls) print("mesg=") if hasattr(self,'mesg'): try: from IPython.display import display display(self.mesg) except: print(self.mesg) print("dof=%d"%self.dof) print("chisq=%f, chisq/red=%f null hypothesis sig=%f"%(self.chisq,self.chisq_red,self.null_hyp_sig)) if self.dof_no_UL is not None: print("") print("stats without the UL") print("dof UL=%d" % self.dof_no_UL) print( "chisq=%f, chisq/red=%f null hypothesis sig=%f" % (self.chisq_no_UL, self.chisq_red_no_UL, self.null_hyp_sig_no_UL)) print("") print("") print("best fit pars") self.parameters.show_best_fit_pars() print('-------------------------------------------------------------------------') print("") @property def bestfit_table(self): return self.parameters.best_fit_par_table def _update_asymm_errors(self): if hasattr(self.mm): for pi in range(len(self.mm.fit_par_free)): if hasattr(self.mm.minimizer,'asymm_errors'): self.mm.fit_par_free[pi].err_p=self.mm.minimizer.asymm_errors[pi][0] self.mm.fit_par_free[pi].err_m=self.mm.minimizer.asymm_errors[pi][1]
[docs] def show_report(self): try: self._show_report() except: print('problem in formatting text for report')
#except Exception as e: # raise(RuntimeWarning,'problem in formatting text for report',e)
[docs] @classmethod def load_report(cls,file_name): c = pickle.load(open(file_name, "rb")) return c
[docs] def save_report(self,name=None): if name is None: name = 'best_fit_report.pkl' mm =self.mm self.mm=None setattr(self, 'mm', mm) try: pickle.dump(self, open(name, 'wb'), protocol=pickle.HIGHEST_PROTOCOL) except: o = FitResults(name=self.name, mm=None, parameters=self.parameters, calls=self.calls, mesg=None, success=self.success, chisq=self.chisq, dof=self.dof, chisq_red=self.chisq_red, null_hyp_sig=self.null_hyp_sig, wd=self.wd, chisq_no_UL=self.chisq_no_UL, dof_no_UL=self.dof_no_UL, chisq_red_no_UL=self.chisq_red_no_UL, null_hyp_sig_no_UL=self.null_hyp_sig_no_UL) pickle.dump(o, open(name, 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
[docs] class ModelMinimizer(object): def __init__(self,minimizer_type): __accepted__ = ['lsb', 'minuit', 'sherpa'] #if minimizer_type == 'lsb-old': # self.minimizer=LSBMinimizer(self) if minimizer_type=='lsb': self.minimizer=LSBMinimizerScipy(self) elif minimizer_type=='minuit': self.minimizer=MinuitMinimizer(self) elif minimizer_type == 'sherpa': from .sherpa_plugin import SherpaMinimizer self.minimizer = SherpaMinimizer(self) elif minimizer_type not in __accepted__: raise RuntimeError('minimizer ', minimizer_type, 'not accepted, please choose among', __accepted__) else: raise RuntimeError('minimizer factory failed') self.minimizer_type=minimizer_type #print('minimizer',minimizer_type)
[docs] def save_model(self, file_name): _m= self.minimizer self.minimizer=self.minimizer_type pickle.dump(self, open(file_name, 'wb'), protocol=pickle.HIGHEST_PROTOCOL) self.minimizer =_m
[docs] @classmethod def load_model(cls, file_name): try: c = pickle.load(open(file_name, "rb")) if isinstance(c, ModelMinimizer): c.__init__(c.minimizer) return c else: raise RuntimeError('The model you loaded is not valid please check the file name') except Exception as e: raise RuntimeError(e)
def _handle_sed_data(self,sed_data,loglog=False,use_fake_err=False): if sed_data.data['dnuFnu_data'] is None: sed_data.data['dnuFnu_data'] = np.ones(sed_data.data['nu_data'].size) if loglog is False: nu_fit = sed_data.data['nu_data'] nuFnu_fit = sed_data.data['nuFnu_data'] if use_fake_err == False: err_nuFnu_fit = sed_data.data['dnuFnu_data'] else: err_nuFnu_fit = sed_data.data['dnuFnu_fake'] else: nu_fit = sed_data.data['nu_data_log'] nuFnu_fit = sed_data.data['nuFnu_data_log'] err_nuFnu_fit = sed_data.data['dnuFnu_data_log'] if use_fake_err == False: err_nuFnu_fit = sed_data.data['dnuFnu_data_log'] else: err_nuFnu_fit = sed_data.data['dnuFnu_fake_log'] UL = sed_data.data['UL'] data = np.zeros(nu_fit.size, dtype=[('x', nu_fit.dtype), ('y', nuFnu_fit.dtype), ('dy', err_nuFnu_fit.dtype), ('UL', UL.dtype)]) data['x'] = nu_fit data['y'] = nuFnu_fit data['dy'] = err_nuFnu_fit data['UL'] = UL return data def _handle_data_xy(self, data, loglog=False, use_fake_err=False): #if data['dy'] is None: # data['dy'] = np.ones(data['x'].size) if loglog is False: x = data.table['x'] y = data.table['y'] if use_fake_err == False: dy = data.table['dy'] else: dy = data.table['dy_fake'] else: x = np.log10(data.table['x']) y, dy = lin_to_log(val=data.table['y'], err=data.table['dy']) if use_fake_err == False: pass else: y, dy = lin_to_log(val=data.table['y'], err=data.table['dy_fake']) UL = data.table['UL'] _data = np.zeros(x.size, dtype=[('x', x.dtype), ('y', y.dtype), ('dy', dy.dtype), ('UL', UL.dtype)]) _data['x'] = x _data['y'] = y _data['dy'] = dy _data['UL'] = UL return _data def _prepare_fit(self, fit_model, data, nu_fit_start, nu_fit_stop, fitname=None, fit_workplace=None, loglog=False, silent=False, get_conf_int=False, use_fake_err=False, use_UL=False): #print('--> DEBUG WorkPlace.flag',WorkPlace.flag,fit_workplace) if fitname is None: fitname = fit_model.name #print('--> DEBUG ') if fit_workplace is None: fit_workplace = WorkPlace() out_dir = fit_workplace.out_dir + '/' + fitname + '/' else: out_dir = fit_workplace.out_dir + '/' + fitname + '/' makedir(out_dir) for model in fit_model.components.components_list: if model.model_type == 'jet': model.set_path(out_dir) if isinstance(data,ObsData): self.data=self._handle_sed_data(data,use_fake_err=use_fake_err,loglog=loglog) elif isinstance(data,Data): self.data= self._handle_data_xy(data,use_fake_err=use_fake_err,loglog=loglog) else: raise RuntimeError('data type must be Obs Data or Data') #if sed_data.data['dnuFnu_data'] is None: # sed_data.data['dnuFnu_data'] = np.ones(sed_data.data['nu_data'].size) # filter data points if loglog is True: nu_fit_start = np.log10(nu_fit_start) nu_fit_stop = np.log10(nu_fit_stop) msk1 = self.data['x'] > nu_fit_start msk2 = self.data['x'] < nu_fit_stop msk_zero_error = self.data['dy'] > 0.0 self.use_UL=use_UL if use_UL == True: msk = msk1 * msk2 * msk_zero_error else: msk = msk1 * msk2 * msk_zero_error*~self.data['UL'] self.data = self.data[msk] if silent == False: print("filtering data in fit range = [%e,%e]" % (nu_fit_start, nu_fit_stop)) print("data length", self.data.size) for par in fit_model.parameters.par_array: if get_conf_int == True: par.set_fit_initial_value(par.best_fit_val) else: par.set_fit_initial_value(par.val) fit_par_free = [par for par in fit_model.parameters.par_array if par.frozen is False and (par.linked is False ) and par._is_dependent is False] #remove duplicates and keeps the order fit_par_free = sorted(set(fit_par_free), key=fit_par_free.index) pinit = [par.get_fit_initial_value() for par in fit_par_free] # bounds free_pars = 0 for pi in range(len(fit_model.parameters.par_array)): if fit_model.parameters.par_array[pi].frozen == False: free_pars += 1 if silent == False: print(section_separator) print("*** start fit process ***") #print("initial pars: ") #fit_model.parameters.show_pars() print("----- ") self.out_dir=out_dir self.pinit=pinit self.pout=None self.free_pars=free_pars self.fit_par_free=fit_par_free self.fit_model=fit_model self.loglog=loglog if hasattr(self.fit_model,'nu_min_fit'): self.fit_model.nu_min_fit = nu_fit_start self.fit_model.nu_max_fit = nu_fit_stop if len(fit_par_free)>self.data['x'].size and isinstance(self.minimizer,LSBMinimizerScipy): m='number of data points: %d is lower than number of free pars: %d'%(self.data['x'], len(fit_par_free)) raise JetkerneltException(message=m)
[docs] def fit(self, fit_model, sed_data, nu_fit_start, nu_fit_stop, fitname=None, fit_workplace=None, loglog=False, silent=False, get_conf_int=False, max_ev=0, use_fake_err=False, use_UL=False, skip_minimizer=False, repeat=1): self.silent=silent self._prepare_fit( fit_model, sed_data, nu_fit_start, nu_fit_stop, fitname=fitname, fit_workplace=fit_workplace, loglog=loglog, silent=silent, get_conf_int=get_conf_int, use_fake_err=use_fake_err,use_UL=use_UL) fit_model.set_nu_grid(nu_min=nu_fit_start*0.5, nu_max=nu_fit_stop*1.5) for i in range(repeat): if skip_minimizer == False: if repeat>1: if silent is False: print('fit run:',i) if i>0: self.pinit = [v for v in self.minimizer.pout] if silent is False: print('- old chisq=%5.5e' % (self.minimizer.chisq)) old_chisq = self.minimizer.chisq self.minimizer.fit(self,max_ev=max_ev,silent=silent,use_UL=use_UL) self.pout = self.minimizer.pout self.errors = self.minimizer.errors if hasattr(self.minimizer, 'asymm_errors'): self.asymm_errors = self.minimizer.asymm_errors self.covar=self.minimizer.covar self.corr=self.minimizer.corr self.reset_to_best_fit() self.minimizer._fit_stats() if silent is False: print('- best chisq=%5.5e'%(self.minimizer.chisq)) print() new_chisq = self.minimizer.chisq if i==0: old_chisq = new_chisq else: pass return self.get_fit_results(fit_model,nu_fit_start,nu_fit_stop,fitname,loglog=loglog,silent=silent)
[docs] def get_fit_results(self, fit_model, nu_fit_start, nu_fit_stop, fitname, silent=False, loglog=False): self.reset_to_best_fit() self.minimizer._fit_stats() best_fit = FitResults(fitname, self, fit_model.parameters, self.minimizer.calls, self.minimizer.mesg, self.minimizer.success, self.minimizer.chisq, self.minimizer.dof, self.minimizer.chisq_red, self.minimizer.null_hyp_sig, self.out_dir, chisq_no_UL=self.minimizer.chisq_no_UL, dof_no_UL=self.minimizer.dof_no_UL, chisq_red_no_UL=self.minimizer.chisq_red_no_UL, null_hyp_sig_no_UL=self.minimizer.null_hyp_sig_no_UL) if silent == False: best_fit.show_report() fit_model.set_nu_grid(nu_min=nu_fit_start , nu_max=nu_fit_stop) fit_model.eval(fill_SED=True, loglog=loglog, phys_output=True) res_bestfit = self.minimizer.residuals_Fit(self.minimizer.pout, self.fit_par_free, self.data, self.fit_model, self.loglog, use_UL=self.use_UL) if loglog == True: fit_model.SED.fill(nu_residuals=np.power(10, self.data['x']), residuals=res_bestfit) else: fit_model.SED.fill(nu_residuals=self.data['x'], residuals=res_bestfit) if silent == False: print(section_separator) fit_model.eval(fill_SED=True) if self.minimizer._post_fit_warnings!='': print('there are fit warnings messages, use the .show_fit_warnings() or access the member .minimizer._post_fit_warnings') return best_fit
[docs] def show_fit_warnings(self): print(self.minimizer._post_fit_warnings)
[docs] def reset_to_best_fit(self): for pi in range(len(self.fit_par_free)): self.fit_par_free[pi].set(val=self.pout[pi]) self.fit_par_free[pi].best_fit_val = self.pout[pi] self.fit_par_free[pi].best_fit_err = self.errors[pi] if hasattr(self,'asymm_errors'): self.fit_par_free[pi].err_p=self.asymm_errors[pi][0] self.fit_par_free[pi].err_m=self.asymm_errors[pi][1] self.fit_model.eval()
[docs] def plot_corr_matrix(self): fig, ax = plt.subplots() labels=[p.name for p in self.fit_par_free] im, cbar = heatmap(self.corr, labels, labels, ax=ax, cmap="coolwarm", cbarlabel="corr") texts = annotate_heatmap(im, valfmt="{x:.2f}") fig.tight_layout() return fig
[docs] class Minimizer(object): def __init__(self,model): self.model=model self._progress_iter = cycle(['|', '/', '-', '\\']) self._post_fit_warnings='' self.pbar=None self.covar=None #self._progess_bar(pbar, _res_sum, _res_sum_UL)
[docs] def fit(self,model, max_ev=None, use_UL=False, silent=False): if silent is False: self.pbar = tqdm(total=None) self.use_UL = use_UL self.calls=0 self.res_check=None self.model=model self.silent=silent self._original_silent_state = silent self._fit(max_ev) self._fit_stats() self._set_fit_errors()
def _force_silent(self): self.silent=True def _restore_silent_state(self): self.silent = self._original_silent_state def _fit_stats(self): self.dof = len(self.model.data['x']) - self.model.free_pars self.chisq = self.get_chisq() self.chisq_red = self.chisq / float(self.dof) self.null_hyp_sig = 1.0 - chi2.cdf(self.chisq, self.dof) if self.use_UL==True: self.dof_no_UL = len(self.model.data['x']) - self.model.free_pars - self.model.data['UL'].sum() self.use_UL=False self.chisq_no_UL = self.get_chisq() self.use_UL = True self.chisq_red_no_UL = self.chisq_no_UL / float(self.dof_no_UL) self.null_hyp_sig_no_UL=1.0 - chi2.cdf(self.chisq_no_UL, self.dof_no_UL) else: self.dof_no_UL = None self.chisq_red_no_UL = None self.chisq_no_UL=None self.null_hyp_sig_no_UL=None self.success = True self.status = 1 def _set_fit_errors(self): if self.covar is None: print("!Warning, no covariance matrix produced") self.errors = np.zeros(len(self.pout)) else: self.errors = [np.sqrt(np.fabs(self.covar[pi, pi]) * self.chisq_red) for pi in range(len(self.model.fit_par_free))] def _progess_bar(self, _res_sum, res_sum_UL): if (np.mod(self.calls, 10) == 0 and self.calls != 0) : m="minim. function calls=%d, chisq=%5.5e UL part=%f" %(self.calls, _res_sum, res_sum_UL) self.pbar.n=self.calls self.pbar.set_description(m) self.pbar.update(1)
[docs] def residuals_Fit(self, p, fit_par, data, best_fit_SEDModel, loglog, chisq=False, use_UL=False, silent=False): #_warn=False for pi in range(len(fit_par)): #_old_v=fit_par[pi].val fit_par[pi].set(val=p[pi]) if np.isnan(p[pi]): _warn=True s1 = str('warning nan for par %s' % fit_par[pi].name) s2 = ', old parameter value was %s' % str(fit_par[pi].val) s =s1 + s2 warnings.warn(s) self._post_fit_warnings+=s+'\n' model = best_fit_SEDModel.eval(nu=data['x'], fill_SED=False, get_model=True, loglog=loglog) _res_sum, _res, _res_sum_UL=_eval_res(data['y'], model, data['dy'], data['UL'], use_UL=use_UL) self._res_sum_chekc=_res_sum self._res_chekc = _res self._res_UL_chekc = _res_sum_UL self.calls +=1 if silent==False: #with tqdm(total=None, file=sys.stdout) as pbar: if self.pbar is not None: self._progess_bar(_res_sum, _res_sum_UL) #print("\r", end="") if chisq==True: res=_res_sum else: res=_res return res
[docs] def get_chisq(self): model = self.model.fit_model.eval(nu=self.model.data['x'], fill_SED=False, get_model=True, loglog=self.model.loglog) _res_sum, _res, _res_sum_UL = _eval_res(self.model.data['y'], model, self.model.data['dy'], self.model.data['UL'], use_UL=self.use_UL) return _res_sum
@property def corr(self): if self.covar is not None: v = np.sqrt(np.diag(self.covar)) outer_v = np.outer(v, v) correlation = self.covar / outer_v correlation[self.covar == 0] = 0 return correlation else: return None
def _eval_res(data, model, data_error, UL, use_UL=False): res_no_UL = (data[~UL] - model[~UL]) / (data_error[~UL]) res = (data - model) / (data_error) res_UL_log = [0] #print('UL.sum() ',UL.sum(),use_UL) if UL.sum() > 0 and use_UL is True: res_UL_log, res_UL = _eval_res_UL(data[UL], model[UL], data_error[UL]) #print('ress_UL', res_UL) res[UL]=res_UL res_sum_UL= -2.0*np.sum(res_UL_log) res_sum=np.sum(res_no_UL * res_no_UL) + res_sum_UL return res_sum,res,res_sum_UL def _eval_res_UL(y_UL,y_model,y_err): y = (y_UL - y_model) / (np.sqrt(2) *y_err) res_ul_chi2 = 0.5 * (1.0 + sp.special.erf(y)) res_ul_chi2[res_ul_chi2 < 1E-300] = 1E-300 res_ul = np.copy(y) res_ul[y > 0] = 0 return np.log(res_ul_chi2), res_ul
[docs] class LSBMinimizerScipy(Minimizer): def __init__(self, model): super(LSBMinimizerScipy, self).__init__(model) self.conf_dict=dict( xtol=None, ftol=1E-8, gtol=1E-8, jac='3-point', loss='linear', tr_solver=None, f_scale=1.0) def _fit(self,max_ev): bounds = ([-np.inf if par.fit_range_min is None else par.fit_range_min for par in self.model.fit_par_free], [np.inf if par.fit_range_max is None else par.fit_range_max for par in self.model.fit_par_free]) max_nfev = None if (max_ev == 0 or max_ev is None) else max_ev if self.conf_dict['xtol'] is None and max_nfev is not None: max_nfev=None print('for xtol set to None max_ev condition is disabled') fit = least_squares(self.residuals_Fit, self.model.pinit, args=(self.model.fit_par_free, self.model.data, self.model.fit_model, self.model.loglog, False, True, self.silent), method='trf', bounds=bounds, max_nfev=max_nfev, verbose=0, **self.conf_dict) try: self.covar=np.linalg.inv(2 * np.dot(fit.jac.T, fit.jac)) except Exception as e: warnings.warn('covariance failed') self.covar=None self.chisq=sum(fit.fun * fit.fun) self.mesg = fit.message self.pout = fit.x
[docs] class MinuitMinimizer(Minimizer): def __init__(self, model, add_simplex=True, conf_dict=dict(tol=0.1)): if minuit_installed==True: pass else: raise RuntimeError('iminuit non installed') super(MinuitMinimizer, self).__init__(model) self.conf_dict=conf_dict self.add_simplex=add_simplex def _fit(self,max_ev=None): bounds = [(par.fit_range_min, par.fit_range_max) for par in self.model.fit_par_free] self._set_minuit_func(self.model.pinit, bounds) if max_ev is None or max_ev==0: max_ev =10000 self.minuit_fun.tol=self.conf_dict['tol'] if self.add_simplex is True: print('====> simplex') _=self.minuit_fun.simplex(ncall=max_ev) print('====> migrad') self.mesg=self.minuit_fun.simplex(ncall=max_ev).migrad(ncall=max_ev) else: print('====> migrad') self.mesg=self.minuit_fun.simplex(ncall=max_ev).migrad(ncall=max_ev) if iminuit.__version__ < "2": self.pout=[self.minuit_fun.values[k] for k in self.minuit_fun.values.keys()] self.p=[self.minuit_fun.values[k] for k in self.minuit_fun.values.keys()] else: self.pout=[None]*len(self.minuit_fun.values) self.p = [None] * len(self.minuit_fun.values) for ID,val in enumerate(self.minuit_fun.values): self.pout[ID] = val self.p[ID] = val self.covar=np.array(self.minuit_fun.covariance) if hasattr(self.mesg,'_covariance'): self.mesg._covariance=None def _set_fit_errors(self): if iminuit.__version__ < "2": self.errors = [self.minuit_fun.errors[k] for k in self.minuit_fun.errors] else: self.errors = [None] * len(self.minuit_fun.values) for ID,err in enumerate(self.minuit_fun.errors): self.errors[ID] = err def _set_minuit_func(self, p_init, bounds,p_error=None): if p_error==None: p_error=[0.1]*len(p_init) p_names = ['par_{}'.format(_) for _ in range(len(p_init))] error_names = ['error_par_{}'.format(_) for _ in range(len(p_init))] p_bound_names = ['limit_par_{}'.format(_) for _ in range(len(p_init))] #This dict contains all the kw for #par kwdarg = {} for n, p, bn, b,en,e in zip(p_names, p_init, p_bound_names, bounds,error_names,p_error): kwdarg[n] = p kwdarg[bn] = b kwdarg[en] = e self.minuit_par_name_dict={} self.minuit_bounds_dict={} self.par_dict = {} for ID,par in enumerate(self.model.fit_par_free): self.minuit_par_name_dict[par.name]=p_names[ID] self.minuit_bounds_dict[par.name]=bounds[ID] self.par_dict[par.name]=par if iminuit.__version__ < "2": self.minuit_fun = iminuit.Minuit(fcn=self.chisq_func, name=p_names, pedantic=False, errordef=1, **kwdarg) else: values=tuple(par.val for ID,par in enumerate(self.model.fit_par_free)) self.minuit_fun = iminuit.Minuit(self.chisq_func, values, name=p_names) self.minuit_fun.limits=[ bounds[ID] for ID,par in enumerate(self.model.fit_par_free) ] #print('=> self.minuit_fun.limits', self.minuit_fun.limits) self.minuit_fun.errordef = Minuit.LEAST_SQUARES
[docs] def chisq_func(self, *p): if iminuit.__version__ < "2": self.p = p else: self.p = p[0] return self.residuals_Fit(self.p, self.model.fit_par_free, self.model.data, self.model.fit_model, self.model.loglog, chisq=True, use_UL=self.use_UL, silent=self.silent)
[docs] def minos_errors(self,par=None): try: if par is not None: par=self.minuit_par_name_dict[par] self.calls = 0 self.minuit_fun.minos(var=par) self.minuit_fun.print_param() self.asymm_errors = [(self.minuit_fun.merrors[(k,1.0)],self.minuit_fun.merrors[(k,-1.0)]) for k in self.minuit_fun.errors.keys()] self.model.reset_to_best_fit() except: raise RuntimeWarning('Fit quality not sufficient to run Minos')
[docs] def profile(self,par,bound=2,subtract_min=True): self._force_silent() try: self.calls = 0 bound=self._set_bounds(par,bound=bound) x, y = self.minuit_fun.profile(self.minuit_par_name_dict[par], bound=bound, subtract_min=subtract_min) self.model.reset_to_best_fit() self._restore_silent_state() return x,y except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run profile')
[docs] def mnprofile(self,par,bound=2,subtract_min=True): """ Parameters ---------- par bound subtract_min Returns ------- """ try: self._force_silent() self.calls = 0 bound = self._set_bounds(par, bound=bound) x, y,r = self.minuit_fun.mnprofile(self.minuit_par_name_dict[par], bound=bound, subtract_min=subtract_min) #print('-->r',r) self.model.reset_to_best_fit() self._restore_silent_state() return x,y,r except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run mnprofile')
[docs] def draw_mnprofile(self,par,bound=2): """ Parameters ---------- par bound Returns ------- """ try: self._force_silent() self.calls = 0 bound = self._set_bounds(par, bound=bound) x,y,r=self.mnprofile(par,bound,subtract_min=True) fig,ax = self._draw_profile(x, y, par) self.model.reset_to_best_fit() self._restore_silent_state() return x, y, fig,ax except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run draw_mnprofile')
[docs] def draw_profile(self,par,bound=2): try: self._force_silent() self.calls = 0 bound = self._set_bounds(par, bound=bound) x, y = self.profile(par, subtract_min=True, bound=bound) fig,ax=self._draw_profile(x,y,par) self.model.reset_to_best_fit() self._restore_silent_state() return x,y,fig,ax except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run draw_profile')
def _draw_profile(self,x,y,par): self.calls = 0 x = np.array(x) y = np.array(y) fig, ax = plt.subplots() ax.plot(x, y) ax.grid(True) ax.set_xlabel(par) ax.set_ylabel('FCN') minpos = np.argmin(y) ymin = y[minpos] tmpy = y - ymin # now scan for minpos to the right until greater than one up = self.minuit_fun.errordef righty = np.power(tmpy[minpos:] - up, 2) right_min = np.argmin(righty) rightpos = right_min + minpos lefty = np.power((tmpy[:minpos] - up), 2) left_min = np.argmin(lefty) leftpos = left_min le = x[minpos] - x[leftpos] re = x[rightpos] - x[minpos] ax.axvspan(x[leftpos], x[rightpos], facecolor='g', alpha=0.5) plt.figtext(0.5, 0.5, '%s = %7.3e ( -%7.3e , +%7.3e)' % (par, x[minpos], le, re), ha='center') return fig,ax
[docs] def contour(self,par_1,par_2,bound=2,bins=20,subtract_min=True): try: self.calls = 0 if np.shape(bound)==(): bound_1 = self._set_bounds(par_1,bound) bound_2 = self._set_bounds(par_2,bound) elif np.shape(bound)==(2,) or np.shape(bound)==(2,2): bound_1 = self._set_bounds(par_1, bound[0]) bound_2 = self._set_bounds(par_2, bound[1]) else: raise RuntimeError('bound must be scalar or (2,) or (2,2)') bound = [bound_1, bound_2] self._force_silent() x, y, z= self.minuit_fun.contour(self.minuit_par_name_dict[par_1], self.minuit_par_name_dict[par_2], subtract_min=subtract_min, bound=bound, bins=bins) self.model.reset_to_best_fit() self._restore_silent_state() return x,y,z except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run contour')
[docs] def draw_contour(self,par_1,par_2,bound=2,levels=np.arange(5)): try: self.calls = 0 l=self.minuit_fun.errordef*levels x,y,z=self.contour(par_1,par_2,bound=bound) fig, ax = plt.subplots() CS = ax.contour(x, y, z, levels,) ax.clabel(CS, fontsize=9, inline=1) ax.set_xlabel(par_1) ax.set_ylabel(par_2) self.model.reset_to_best_fit() return x,y,z,fig,ax except: raise RuntimeWarning('Fit quality not sufficient to run draw_contour')
[docs] def mncontour(self,par_1,par_2,bound=2,bins=20,subtract_min=True): try: self.calls = 0 if np.shape(bound)==(): bound_1 = self._set_bounds(par_1,bound) bound_2 = self._set_bounds(par_2,bound) elif np.shape(bound)==(2,) or np.shape(bound)==(2,2): bound_1 = self._set_bounds(par_1, bound[0]) bound_2 = self._set_bounds(par_2, bound[1]) else: raise RuntimeError('bound must be scalar or (2,) or (2,2)') bound = [bound_1, bound_2] self._force_silent() x, y, z= self.minuit_fun.mncontour(self.minuit_par_name_dict[par_1], self.minuit_par_name_dict[par_2], subtract_min=subtract_min, bound=bound, bins=bins) self.model.reset_to_best_fit() self._restore_silent_state() return x,y,z except: self._restore_silent_state() raise RuntimeWarning('Fit quality not sufficient to run mcontour')
[docs] def draw_mncontour(self,par_1,par_2,bound=2,levels=np.arange(5)): try: self.calls = 0 l=self.minuit_fun.errordef*levels x,y,z=self.mncontour(par_1,par_2,bound=bound) fig, ax = plt.subplots() CS = ax.contour(x, y, z, levels,) ax.clabel(CS, fontsize=9, inline=1) ax.set_xlabel(par_1) ax.set_ylabel(par_2) self.model.reset_to_best_fit() return x,y,z,fig,ax except: raise RuntimeWarning('Fit quality not sufficient to run draw_mcontour')
def _set_bounds(self,par,bound): p = self.par_dict[par] if np.shape(bound)==(): bound = [p.best_fit_val - p.best_fit_err * bound, p.best_fit_val + p.best_fit_err * bound] elif np.shape(bound)==(2,): pass else: raise RuntimeError('bound shape',np.shape(bound),'for par',p.name,'is wrong, has to be scalare or (2,)') #print('bound for par', par, ' set to', bound) bound = self._check_bounds(par, bound) #print('bound for par',par,' updated to',bound) return bound def _check_bounds(self,par,bound): if self.minuit_bounds_dict[par][0] is not None: if bound[0]<self.minuit_bounds_dict[par][0]: bound[0]=self.minuit_bounds_dict[par][0] if self.minuit_bounds_dict[par][1] is not None: if bound[1]>self.minuit_bounds_dict[par][1]: bound[1]=self.minuit_bounds_dict[par][1] return bound
[docs] def fit_SED(fit_model, sed_data, nu_fit_start, nu_fit_stop, fitname=None, fit_workplace=None, loglog=False, silent=False, get_conf_int=False, max_ev=0, use_fake_err=False, minimizer='lsb', use_UL=False,repeat=3): mm = ModelMinimizer(minimizer) return mm,mm.fit(fit_model, sed_data, nu_fit_start, nu_fit_stop, fitname=fitname, fit_workplace=fit_workplace, loglog=loglog, silent=silent, get_conf_int=get_conf_int, max_ev=max_ev, use_fake_err=use_fake_err, use_UL=use_UL, repeat=repeat)
def fit_XY(fit_model, data, x_fit_start,x_fit_stop,fitname=None,silent=False,get_conf_int=False,minimizer='minuit', use_UL=False, repeat=1): mm = ModelMinimizer(minimizer) return mm, mm.fit(fit_model, data, nu_fit_start=x_fit_start, nu_fit_stop=x_fit_stop, fitname=fitname, fit_workplace=None, loglog=False, silent=silent, get_conf_int=get_conf_int, max_ev=0, use_fake_err=False, use_UL=use_UL, repeat=repeat)