Source code for jetset.jet_emitters

"""Emitter distribution classes and helpers used by jet models."""

__author__ = "Andrea Tramacere"

import os
import numpy as np
from inspect import signature
from numba import njit
from numba.extending import is_jitted
import copy
import warnings
from astropy.constants import m_e,m_p,c
from scipy import interpolate
from .jetkernel_models_dic import gamma_dic_e, gamma_dic_p, gamma_dic_pp_e_second, gamma_dic_e_equilibrium, available_emitters_type
from .plot_sedfit import PlotPdistr
from .jet_paramters import *
from .utils import set_str_attr, get_nested_attr, set_nested_attr
from .model_parameters import ModelParameterArray, ModelParameter
from .jet_kernel_tools import get_emitters_c_array1d_fast as get_emitters
from .jet_kernel_tools import set_emitters_c_array1d_fast as set_emitters

on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
if on_rtd is True:
    from .mock import jetkernel as BlazarSED
else:
    from .jetkernel import jetkernel as BlazarSED

__all__ = ['EmittersDistribution',
           'BaseEmittersDistribution',
           'ArrayDistribution',
           'EmittersArrayDistribution',
           'InjEmittersDistribution',
           'InjEmittersArrayDistribution']


[docs] class ArrayDistribution(object): """Pair of emitter-energy and density arrays used for tabulated spectra. Notes ----- Validates input array sizes and stores the grid size used when expanding tabulated distributions onto model evaluation grids. """ def __init__(self, e_array, n_array, gamma_grid_size=None): """Create a new `ArrayDistribution` instance. Parameters ---------- e_array : object Energy/gamma array for tabulated input. n_array : object Distribution values associated with ``e_array``. gamma_grid_size : object, optional Number of points in the gamma grid. """ self.e_array = e_array self.n_array = n_array _size = e_array.size if n_array.size != _size: raise RuntimeError('e_array and n_array must have same size') self.size = _size if gamma_grid_size is None: self.gamma_grid_size = _size * 2 + 1
def check_par_name(method): """Check par name. Parameters ---------- method : object Callable/function to decorate or evaluate. Returns ------- object Computed value. """ def inner(ref, *args, **kwargs): #print(ref,args,kwargs) """Inner. Parameters ---------- ref : object Parameter controlling ref. *args : tuple Additional positional arguments. **kwargs : dict Additional keyword arguments. Returns ------- object Computed result. """ if args[0] in ref._skip: raise RuntimeError('par name',args[0],'can not be in proteced names',ref._skip) else: return method(ref,*args, **kwargs) return inner
[docs] class BaseEmittersDistribution(object): """Base implementation for emitter distribution parameterizations. Notes ----- Defines common parameter handling, gamma-grid generation, normalization, numerical moment evaluation, and plotting helpers shared by electron and proton distribution subclasses. """ def __init__(self, name, spectral_type, gamma_grid_size=200, log_values=False, emitters_type='electrons', skip_build=False, normalize=False): """Create a new `BaseEmittersDistribution` instance. Parameters ---------- name : object Name identifier. spectral_type : object Spectral-shape identifier. gamma_grid_size : int, optional Number of points in the gamma grid. log_values : bool, optional If ``True``, parameters are represented in log10 scale. emitters_type : str, optional Emitter population type. skip_build : bool, optional If ``True``, skip build. normalize : bool, optional If ``True``, normalize the distribution before scaling. """ self._spectral_type = None self._allowed_spectral_types = ['bkn', 'plc', 'lp', 'lppl', 'pl', 'lpep', 'array','user_defined'] self._set_emitters_type(emitters_type) self._set_spectral_type(spectral_type) self._Norm = 1.0 self._skip = ['gmin', 'gmax', 'N', 'NH_pp', 'Q'] if skip_build is False: self._build( name, log_values, gamma_grid_size, normalize) def _build(self, name, log_values, gamma_grid_size, normalize): self._user_defined = True self._name = name self._log_values = log_values self._gamma_grid_size = gamma_grid_size self.parameters = ModelParameterArray() self._set_base_pars() self.normalize = normalize self._reset_equilibrium_state() def _reset_equilibrium_state(self): # Shared state used by plotting/readback when cooling-equilibrium overlays are available. self.gamma_cooling_eq = None self.gamma_cooling_eq_second = None self._primaries_done = False self._secondaries_done = False def _set_spectral_type(self,spectral_type): if spectral_type not in self._allowed_spectral_types: raise RuntimeError('spectral_type=',spectral_type,' not in allowed',self._allowed_spectral_types) else: self._spectral_type = spectral_type
[docs] @staticmethod def spectral_types_obs_constrain(): """Spectral types obs constrain. Returns ------- object Computed value. """ return ['bkn', 'plc', 'lp', 'lppl', 'pl', 'lpep', 'array']
@property def spectral_type(self): """Spectral type. Returns ------- object Requested value. """ return self._spectral_type
[docs] @check_par_name def add_par(self, name, par_type, val, vmax, vmin, unit='', log=False, frozen=False): """Add par. Parameters ---------- name : object Name identifier. par_type : object Parameter type/category label. val : object Value to assign. vmax : object Maximum allowed value. vmin : object Minimum allowed value. unit : str, optional Physical unit string. log : bool, optional If ``True``, treat the value in log10 units. frozen : bool, optional If ``True``, keep parameter fixed during fitting. """ self.parameters.add_par(ModelParameter(name=name, par_type=par_type, val=val, val_min=vmin, val_max=vmax, units=unit, frozen=frozen, log=log,))
[docs] def set_distr_func(self,distr_func): """Set distr func. Parameters ---------- distr_func : object Distribution function used to evaluate emitters. """ if distr_func is not None: self._validate_func(distr_func) self.distr_func=distr_func
def _validate_func(self,distr_func): s=signature(distr_func) for param in s.parameters.values(): if self.parameters.get_par_by_name(param.name) is None and param.name!='gamma': raise RuntimeError('distr_func parameter',param.name,'not among parameters names',self.parameters.names) if 'gamma' not in s.parameters.keys(): raise RuntimeError('gamma not present in distr_func signature') def _set_emitters_type(self,emitters_type): self.available_emitters_type = available_emitters_type self._check_emitters_type(emitters_type) self.emitters_type = emitters_type def _set_base_pars(self): #model_dic = {} a_h, b_h = self.set_bounds(1, 1E15, log_val=self._log_values) a_l, b_l = self.set_bounds(1, 1E9, log_val=self._log_values) #a_t, b_t = self.set_bounds(1, 1E9, log_val=self._log_values) self.parameters.add_par( ModelParameter(name='gmin', par_type='low-energy-cut-off',val=2, val_min=a_l, val_max=b_l, units='lorentz-factor', log=self._log_values)) self.parameters.add_par( ModelParameter(name='gmax', par_type='high-energy-cut-off',val=1E5, val_min=a_h, val_max=b_h, units='lorentz-factor', log=self._log_values)) self.parameters.add_par( ModelParameter(name='N', par_type='emitters_density', val=1, val_min=0, val_max=None, units='cm^-3'))
[docs] def set_bounds(self,a,b,log_val=False): """Set bounds. Parameters ---------- a : object First shape/control parameter. b : object Second shape/control parameter. log_val : bool, optional If ``True``, interpret bounds/values in log10 scale. Returns ------- object Computed value. """ if log_val == False: return [a,b] else: return np.log10([a,b])
def _set_log_val(self,a,log_val=False): if log_val == False: return a else: return np.log10(a) def _eval_func(self,gamma): p_dict = {} for par in self.parameters.par_array: if par.name not in self._skip: p_dict[par.name] = par.val_lin p_dict['gamma']=gamma return self.distr_func(**p_dict) @property def name(self): """Name. Returns ------- object Requested value. """ return self._name def _check_emitters_type(self, emitters_type): if emitters_type not in available_emitters_type: raise RuntimeError("type of emitters :%s, not allowed" % emitters_type, "please choose among: ", self.available_emitters_type) else: pass
[docs] def update(self): """Refresh distribution arrays from current parameter values. Notes ----- Recomputes internal grids and distribution values via ``_fill``. """ self._fill()
[docs] def set_grid(self): """Build logarithmic gamma grid from ``gmin`` and ``gmax``. Notes ----- Grid size is controlled by ``self._gamma_grid_size``. """ gmin = self.parameters.get_par_by_name('gmin').val_lin gmax = self.parameters.get_par_by_name('gmax').val_lin self._gamma_grid = np.logspace(np.log10(gmin), np.log10(gmax), self._gamma_grid_size)
[docs] def eval_N(self): #NOTE: commented until is fixed the setting to zero of Ne_jetset for protons #self.update() """Evaluate n. Returns ------- object Computed value. """ if self.emitters_type=='electrons': return np.trapezoid(self.n_gamma_e,self.gamma_e) elif self.emitters_type=='protons': return np.trapezoid(self.n_gamma_p, self.gamma_p) #NOTE: to be added for leptonic-equilibrium #elif self.emitters_type=='electrons-equilibrium': # return np.trapezoid(self.n_gamma_e,self.gamma_e) else: raise RuntimeError('emitters type',self.emitters_type, 'not valid')
[docs] def eval_U(self, gmin=None, gmax=None): #NOTE: commented until is fixed the setting to zero of Ne_jetset for protons #self.update() """Evaluate u. Parameters ---------- gmin : object, optional Lower gamma bound. gmax : object, optional Upper gamma bound. Returns ------- object Computed value. """ if self.emitters_type == 'electrons':# or self.emitters_type == 'electrons-equilibrium': x=self.gamma_e y=self.n_gamma_e * self.gamma_e cost=m_e.cgs.value * (c.cgs ** 2).value elif self.emitters_type == 'protons': x = self.gamma_p y = self.n_gamma_p * self.gamma_p cost = m_p.cgs.value * (c.cgs ** 2).value else: raise RuntimeError('emitters type',self.emitters_type, 'not valid') msk = np.ones(x.shape, dtype=bool) if gmin is not None: msk = x>=gmin if gmax is not None: msk = np.logical_and(msk, x<=gmax) return np.trapezoid(y[msk],x[msk]) * cost
def _fill(self,): self.set_grid() self.f=self._eval_func(gamma=self._gamma_grid) self._Norm=1.0 if self.normalize is True: self._Norm=1.0/np.trapezoid(self.f,self._gamma_grid) self.f = self.f*self._Norm*self.parameters.get_par_by_name('N').val #NOTE: to be added for leptonic-equilibrium # if self.emitters_type == 'electrons-equilibrium': # #TODO implement here # if self._primaries_done is False: # self.gamma_e_inj = np.zeros(self._gamma_grid_size) # self.n_gamma_e_inj = np.zeros(self._gamma_grid_size) if self.emitters_type == 'electrons': self.gamma_e = np.zeros(self._gamma_grid_size) self.n_gamma_e = np.zeros(self._gamma_grid_size) self.gamma_e = self._gamma_grid self.n_gamma_e = self.f self.n_gamma_e[np.isnan(self.n_gamma_e)] = 0 self.n_gamma_e[np.isinf(self.n_gamma_e)] = 0 if self.emitters_type == 'protons': self.gamma_p = np.zeros(self._gamma_grid_size) self.n_gamma_p = np.zeros(self._gamma_grid_size) if self._secondaries_done is False: self.gamma_e_second_inj = np.zeros(self._gamma_grid_size) self.n_gamma_e_second_inj = np.zeros(self._gamma_grid_size) self.gamma_p = self._gamma_grid self.n_gamma_p = self.f self.n_gamma_p[np.isnan(self.n_gamma_p)] = 0 self.n_gamma_p[np.isinf(self.n_gamma_p)] = 0 self.n_gamma_e_second_inj[np.isnan(self.n_gamma_e_second_inj)] = 0 self.n_gamma_e_second_inj[np.isinf(self.n_gamma_e_second_inj)] = 0 def _plot(self,m, p, y_min=None, y_max=None, x_min=None, x_max=None, energy_unit='gamma',label=None, loglog=False): if hasattr(self,'_jet'): if self._jet is not None: self._set_blob() self._jet.eval() #NOTE: commented until is fixed the setting to zero of Ne_jetset for protons #self.update() if self.emitters_type == 'electrons': if label is None: label = 'electrons' m(self.gamma_e, self.n_gamma_e, y_min=y_min, y_max=y_max, x_min=x_min, x_max=x_max, particle='electrons', energy_unit=energy_unit, label=label) if getattr(self, '_primaries_done', False) is True: if self.gamma_cooling_eq is not None: if energy_unit != 'gamma': eq = self.gamma_cooling_eq * (m_e * c * c).to(energy_unit).value else: eq = self.gamma_cooling_eq if loglog is True: eq = np.log10(eq) p.ax.axvline(eq, ls='--', label='cooling eq. primary', lw=0.5, c='r') if hasattr(self, 'gamma_e_inj') and hasattr(self, 'n_gamma_e_inj'): m(self.gamma_e_inj, self.n_gamma_e_inj, y_min=y_min, y_max=y_max, x_min=x_min, x_max=x_max, particle='electrons', energy_unit=energy_unit, label='electrons (inj)') if self.emitters_type == 'protons': if label is None: label = 'protons' m(self.gamma_p, self.n_gamma_p, y_min=y_min, y_max=y_max, x_min=x_min, x_max=x_max, particle='protons', energy_unit=energy_unit, label=label) if getattr(self, '_secondaries_done', False) is True: if hasattr(self,'n_gamma_e'): m(self.gamma_e, self.n_gamma_e, y_min=y_min, y_max=y_max, x_min=x_min, x_max=x_max, particle='electrons', energy_unit=energy_unit, label='electrons sec.') if energy_unit != 'gamma': eq= self.gamma_cooling_eq_second* (m_e * c * c).to(energy_unit).value else: eq = self.gamma_cooling_eq_second if loglog is True: eq = np.log10(eq) p.ax.axvline(eq, ls='--', label='cooling. eq. second.', lw=0.5, c='r') if hasattr(self, 'gamma_e_second_inj'): m(self.gamma_e_second_inj, self.n_gamma_e_second_inj, y_min=y_min, y_max=y_max, x_min=x_min, x_max=x_max, particle='electrons', energy_unit=energy_unit, label='electrons sec. (inj)') return p
[docs] def plot(self, p=None, y_min=None, y_max=None, x_min=None, x_max=None, energy_unit='gamma',label=None,loglog=False): """Plot. Parameters ---------- p : object, optional Spectral slope/shape parameter or input vector. y_min : object, optional Minimum value for y. y_max : object, optional Maximum value for y. x_min : object, optional Minimum value for x. x_max : object, optional Maximum value for x. energy_unit : str, optional Energy unit used for plotting/output. label : object, optional Label used in output or plots. loglog : bool, optional If ``True``, operate in log10 space. Returns ------- object Computed value. """ if p is None: p = PlotPdistr(loglog=loglog) m=getattr(p,'plot_distr') self._plot(m,p,y_min=y_min,y_max=y_max,x_min=x_min,x_max=x_max,energy_unit=energy_unit,label=label,loglog=loglog) return p
[docs] def plot2p(self, p=None, y_min=None, y_max=None, x_min=None, x_max=None, energy_unit='gamma',label=None,loglog=False): """Plot2p. Parameters ---------- p : object, optional Spectral slope/shape parameter or input vector. y_min : object, optional Minimum value for y. y_max : object, optional Maximum value for y. x_min : object, optional Minimum value for x. x_max : object, optional Maximum value for x. energy_unit : str, optional Energy unit used for plotting/output. label : object, optional Label used in output or plots. loglog : bool, optional If ``True``, operate in log10 space. Returns ------- object Computed value. """ if p is None: p = PlotPdistr(loglog=loglog) m=getattr(p,'plot_distr2p') self._plot(m,p,y_min=y_min,y_max=y_max,x_min=x_min,x_max=x_max,energy_unit=energy_unit,label=label,loglog=loglog) return p
[docs] def plot3p(self, p=None, y_min=None, y_max=None, x_min=None, x_max=None, energy_unit='gamma',label=None,loglog=False): """Plot3p. Parameters ---------- p : object, optional Spectral slope/shape parameter or input vector. y_min : object, optional Minimum value for y. y_max : object, optional Maximum value for y. x_min : object, optional Minimum value for x. x_max : object, optional Maximum value for x. energy_unit : str, optional Energy unit used for plotting/output. label : object, optional Label used in output or plots. loglog : bool, optional If ``True``, operate in log10 space. Returns ------- object Computed value. """ if p is None: p = PlotPdistr(loglog=loglog) m=getattr(p,'plot_distr3p') self._plot(m,p,y_min=y_min,y_max=y_max,x_min=x_min,x_max=x_max,energy_unit=energy_unit,label=label,loglog=loglog) return p
[docs] class EmittersDistribution(BaseEmittersDistribution): """Jet-coupled emitter distribution for electrons or protons. Notes ----- Extends :class:`BaseEmittersDistribution` with synchronization to the jet backend blob, including equilibrium readback paths and jet-kernel parameter dictionary integration. """ def __init__(self, name, spectral_type, jet=None, gamma_grid_size=200, log_values=False, emitters_type='electrons', normalize=False, skip_build=False): """Create a new `EmittersDistribution` instance. Parameters ---------- name : object Name identifier. spectral_type : object Spectral-shape identifier. jet : object, optional Jet model instance. gamma_grid_size : int, optional Number of points in the gamma grid. log_values : bool, optional If ``True``, parameters are represented in log10 scale. emitters_type : str, optional Emitter population type. normalize : bool, optional If ``True``, normalize the distribution before scaling. skip_build : bool, optional If ``True``, skip build. """ super(EmittersDistribution, self).__init__(name, spectral_type=spectral_type, emitters_type=emitters_type, gamma_grid_size=gamma_grid_size, skip_build=True, normalize=normalize, log_values=log_values) if isinstance(self,EmittersArrayDistribution): self._array_gamma = None self._array_n_gamma = None else: if skip_build is False: self._build(jet,name,log_values, gamma_grid_size,normalize) def _copy_from_jet(self, jet): self._name = jet.emitters_distribution._name self._log_values = jet.emitters_distribution._log_values self._gamma_grid_size = jet.emitters_distribution._log_values self._gamma_grid_size = jet.emitters_distribution._gamma_grid_size self.normalize = jet.emitters_distribution.normalize for par in self.parameters.par_array: p = jet.emitters_distribution.parameters.get_par_by_name(par.name) par.set(val=p.val, skip_dep_par_warning=True)
[docs] def update(self): """Refresh distribution values and sync with jet backend state. Notes ----- In equilibrium mode this triggers backend readback instead of direct analytical filling. """ if self.emitters_type == 'electrons' and hasattr(self, '_jet') and self._jet is not None: if getattr(self._jet._blob.emitters, 'do_equilibrium', 0) == 1: # In equilibrium mode the physical electron distribution comes from C readback. self._set_blob() return self._fill()
def _build(self, jet, name, log_values, gamma_grid_size, normalize): self._user_defined=True self._name = name self._log_values = log_values self._gamma_grid_size=gamma_grid_size self.parameters = ModelParameterArray() self.set_parameters_dict() self.normalize = normalize self.gamma_cooling_eq = None self.gamma_cooling_eq_second=None self._primaries_done = False self._secondaries_done=False self.set_jet(jet) def _update_parameters_dict(self): for par in self.parameters.par_array: self._parameters_dict[par.name].val = par.val self._parameters_dict[par.name].vmin = par.val_min self._parameters_dict[par.name].vmax = par.val_max self._parameters_dict[par.name].log = par.islog self._parameters_dict[par.name].punit = par.units self._parameters_dict[par.name]._is_dependent = par._is_dependent self._parameters_dict[par.name]._depending_pars = par._depending_pars self._parameters_dict[par.name]._func = par._func self._parameters_dict[par.name]._master_pars = par._master_pars
[docs] def add_par(self, name, par_type, val, vmax, vmin, unit='', log=False, frozen=False): #if log is True: # val = np.log10(val) """Add par. Parameters ---------- name : object Name identifier. par_type : object Parameter type/category label. val : object Value to assign. vmax : object Maximum allowed value. vmin : object Minimum allowed value. unit : str, optional Physical unit string. log : bool, optional If ``True``, treat the value in log10 units. frozen : bool, optional If ``True``, keep parameter fixed during fitting. """ if name not in self._parameters_dict.keys(): self._parameters_dict[name]=JetModelDictionaryPar(ptype=par_type, vmin=vmin, vmax=vmax, log=log, val=val, punit=unit, froz=frozen, is_in_jetkernel=False) else: raise ValueError('par',name,'already assigned') #print('==> add par',name,val,log) self.parameters.add_par(ModelParameter(name=name, par_type=par_type, val=val, val_min=vmin, val_max=vmax, units=unit, frozen=frozen, log=log,))
[docs] def set_parameters_dict(self,): """Build default parameter dictionary and mirrored parameter array. Notes ----- Values are initialized according to emitter type and distribution class. """ model_dict, a_h, b_h, a_l, b_l, a_t, b_t = self._get_base_dict_and_bounds() model_dict['gmin'].val = 2.0 model_dict['gmin'].is_in_jetkernel = True model_dict['gmax'].val = 1E6 model_dict['gmax'].is_in_jetkernel = True if isinstance(self,EmittersArrayDistribution): model_dict['N'].val = 1.0 else: model_dict['N'].val = 100. model_dict['N'].is_in_jetkernel = True if 'NH_pp' in model_dict.keys(): model_dict['NH_pp'].val = 1. model_dict['NH_pp'].is_in_jetkernel = True self._parameters_dict=model_dict for k,par in model_dict.items(): if par.log is True: val = np.log10(par.val) else: val = par.val self.parameters.add_par(ModelParameter(name=k, par_type=par.ptype, val=val, val_min=par.vmin, val_max=par.vmax, units=par.punit, frozen=par.froz, log=par.log))
def _set_base_pars(self): raise RuntimeError('allowed only in parent class') def _get_base_dict_and_bounds(self): model_dic = {} a_h, b_h = self.set_bounds(1, 1E15, log_val=self._log_values) a_l, b_l = self.set_bounds(1, 1E9, log_val=self._log_values) a_t, b_t = self.set_bounds(1, 1E9, log_val=self._log_values) model_dic['gmin'] = JetModelDictionaryPar(ptype='low-energy-cut-off', vmin=a_l, vmax=b_l, punit='lorentz-factor', log=self._log_values, jetkernel_par_name='emitters.gmin') model_dic['gmax'] = JetModelDictionaryPar(ptype='high-energy-cut-off', vmin=a_h, vmax=b_h, punit='lorentz-factor', log=self._log_values, jetkernel_par_name='emitters.gmax') model_dic['N'] = JetModelDictionaryPar(ptype='emitters_density', vmin=0, vmax=None, punit='cm^-3', jetkernel_par_name='emitters.N') if self.emitters_type =='protons': model_dic['NH_pp'] = JetModelDictionaryPar(ptype='target_density', vmin=0, vmax=None, punit='cm^-3', froz=False, jetkernel_par_name='PP_gamma.NH_pp') if isinstance(self, EmittersArrayDistribution): model_dic['N'] = JetModelDictionaryPar(ptype='scaling_factor', vmin=0, vmax=None, punit='', jetkernel_par_name='emitters.N') return model_dic, a_h, b_h, a_l, b_l, a_t, b_t
[docs] def set_jet(self, jet): """Set jet. Parameters ---------- jet : object Jet model instance. """ if jet is not None: #name passed to the C code #do not change name = 'jetset' self._jet = jet set_str_attr(jet._blob, 'core.DISTR', name) set_str_attr(jet._blob, 'core.PARTICLE', self.emitters_type) p = self._jet.get_par_by_name('gmin') if p is not None: p.set(val=self.parameters.get_par_by_name('gmin').val_lin) else: p=self.parameters.get_par_by_name('gmin') set_nested_attr(jet._blob, 'emitters.gmin', p.val_lin) p = self._jet.get_par_by_name('gmax') if p is not None: p.set(val=self.parameters.get_par_by_name('gmax').val_lin) else: p = self.parameters.get_par_by_name('gmax') set_nested_attr(jet._blob, 'emitters.gmax', p.val_lin) self._jet._blob.emitters.gamma_grid_size = self._gamma_grid_size else: self._jet=jet
[docs] def set_grid_size(self,gamma_grid_size): """Set grid size. Parameters ---------- gamma_grid_size : object Number of points in the gamma grid. """ if gamma_grid_size is not None: if self._jet is not None: set_nested_attr(self._jet._blob, 'emitters.gamma_grid_size', gamma_grid_size) self._fill()
[docs] def set_grid(self): """Build gamma grid either analytically or from jet backend arrays. Notes ----- When attached to a jet, the grid is read from jetkernel memory. """ if self._jet is None: gmin = self.parameters.get_par_by_name('gmin').val_lin gmax = self.parameters.get_par_by_name('gmax').val_lin self._gamma_grid = np.logspace(np.log10(gmin), np.log10(gmax), self._gamma_grid_size) else: BlazarSED.setNgrid(self._jet._blob) if self.emitters_type == 'electrons': #print('==> Build Ne start') BlazarSED.build_Ne_jetset(self._jet._blob) gamma_ptr = get_nested_attr(self._jet._blob, 'emitters.griglia_gamma_jetset_Ne_log') #print('==> Build Ne done') elif self.emitters_type == 'protons': BlazarSED.build_Np_jetset(self._jet._blob) gamma_ptr = get_nested_attr(self._jet._blob, 'emitters.griglia_gamma_jetset_Np_log') #NOTE: to be added for leptonic-equilibrium # elif self.emitters_type == 'electrons-equilibrium': # BlazarSED.build_Ne_jetset(self._jet._blob) # gamma_ptr = get_nested_attr(self._jet._blob, 'emitters.griglia_gamma_jetset_Ne_log') # #print('==> Build Ne done') else: raise RuntimeError('emitters type', self.emitters_type, 'not valid') size = self._jet._blob.emitters.gamma_grid_size self._gamma_grid = np.zeros(size) for ID in range(size): self._gamma_grid[ID] = BlazarSED.get_elec_array(gamma_ptr, self._jet._blob, ID) self._gamma_grid_size=self._jet.gamma_grid_size
def _fill(self,): super(EmittersDistribution, self)._fill() if self._jet is not None: if self.emitters_type == 'electrons': size = self._gamma_grid_size Ne_ptr = get_nested_attr(self._jet._blob, 'emitters.Ne_jetset') set_emitters(Ne_ptr,self._jet._blob,size,self.f) if self.emitters_type == 'protons': size = self._gamma_grid_size Np_ptr = get_nested_attr(self._jet._blob, 'emitters.Np_jetset') set_emitters(Np_ptr,self._jet._blob,size,self.f) def _set_blob(self): size = self._jet._blob.emitters.gamma_grid_size if self.emitters_type == 'electrons': self._Ne_name, self._gammae_name = gamma_dic_e['electron_distr'] self.Ne_ptr = get_nested_attr(self._jet._blob, self._Ne_name) self.e_gamma_ptr = get_nested_attr(self._jet._blob, self._gammae_name) self.gamma_e,self.n_gamma_e=get_emitters(self.e_gamma_ptr,self.Ne_ptr, self._jet._blob,size) self._primaries_done = False self.gamma_cooling_eq = None if getattr(self._jet._blob.emitters, 'do_equilibrium', 0) == 1: self._Q_inj_e_name, self._gammae_inj_name = gamma_dic_e_equilibrium['e_inj'] self._Q_inj_e_ptr = get_nested_attr(self._jet._blob, self._Q_inj_e_name) self.e_inj_gamma_ptr = get_nested_attr(self._jet._blob, self._gammae_inj_name) self.gamma_e_inj, self.n_gamma_e_inj = get_emitters(self.e_inj_gamma_ptr, self._Q_inj_e_ptr, self._jet._blob, size) self.gamma_cooling_eq = self._jet._blob.emitters.gamma_cooling_eq self._primaries_done = True elif self.emitters_type == 'protons': self._Ne_name, self._gammae_name = gamma_dic_e['electron_distr'] self._Np_name, self._gammap_name = gamma_dic_p['proton_distr'] self._Q_inj_e_second_name, self._gammae_inj_sec_name = gamma_dic_pp_e_second['e_second_inj'] self.Np_ptr = get_nested_attr(self._jet._blob, self._Np_name) self.p_gamma_ptr = get_nested_attr(self._jet._blob, self._gammap_name) self.Ne_ptr = get_nested_attr(self._jet._blob, self._Ne_name) self.e_gamma_ptr = get_nested_attr(self._jet._blob, self._gammae_name) self._Q_inj_e_second_ptr = get_nested_attr(self._jet._blob, self._Q_inj_e_second_name) self.e_inj_second_gamma_ptr = get_nested_attr(self._jet._blob, self._gammae_inj_sec_name) else: raise RuntimeError(f"emitters type {self.emitters_type} not valid ", available_emitters_type) #NOTE: this is needed to get the pointers to Ne also in the case of protons self.gamma_e,self.n_gamma_e=get_emitters(self.e_gamma_ptr,self.Ne_ptr, self._jet._blob,size) if self.emitters_type == 'protons': self.gamma_p,self.n_gamma_p=get_emitters(self.p_gamma_ptr,self.Np_ptr, self._jet._blob,size) self.gamma_e_second_inj,self.n_gamma_e_second_inj=get_emitters(self.e_inj_second_gamma_ptr,self._Q_inj_e_second_ptr, self._jet._blob,size) self.gamma_cooling_eq_second= self._jet._blob.emitters.gamma_cooling_eq self._secondaries_done = True def _activate_numba(self): self._py_distr_func=copy.deepcopy(self.distr_func) try: if is_jitted(self.distr_func) is False: self.distr_func = njit(self.distr_func,fastmath=True, cache=False) except Exception as e: print("failed to activate numba for",str(e))
[docs] class EmittersArrayDistribution(EmittersDistribution): """Emitter distribution defined by user-provided tabulated arrays. Notes ----- Interpolates ``n(gamma)`` from provided arrays and exposes it through the standard emitter-distribution interface used by jet models. """ def __init__(self, name, jet=None, emitters_type='electrons', normalize=False, skip_build=False, gamma_array=None, n_gamma_array=None, gamma_grid_size=None): """Create a new `EmittersArrayDistribution` instance. Parameters ---------- name : object Name identifier. jet : object, optional Jet model instance. emitters_type : str, optional Emitter population type. normalize : bool, optional If ``True``, normalize the distribution before scaling. skip_build : bool, optional If ``True``, skip build. gamma_array : object, optional Gamma grid values for tabulated distributions. n_gamma_array : object, optional Distribution values corresponding to ``gamma_array``. gamma_grid_size : object, optional Number of points in the gamma grid. """ super(EmittersArrayDistribution, self).__init__(name, spectral_type='array', emitters_type=emitters_type) if gamma_array is not None and n_gamma_array is not None: if self._spectral_type != 'array': raise RuntimeError('you can pass gamma_array and n_gamma_array only for array distribution') self._set_arrays(gamma_array,n_gamma_array) elif (gamma_array is not None) and (n_gamma_array is not None) is False: raise RuntimeError('you have to pass both gamma_array and n_gamma_array for array distribution') if gamma_grid_size is None: gamma_grid_size=gamma_array.size if skip_build is False: self._build(jet, name, False, gamma_grid_size, normalize) self.parameters.gmin.val=self._array_gamma[0] self.parameters.gmax.val = self._array_gamma[-1] self.parameters.N.val=1 self.set_distr_func(self._array_func) def _set_arrays(self,gamma_array,n_gamma_array): _ids = np.argsort(gamma_array) self._array_gamma = gamma_array[_ids] self._array_n_gamma = n_gamma_array[_ids] def _array_func(self,gamma): msk_nan=self._array_n_gamma>0 if msk_nan.sum()>1: f_interp = interpolate.interp1d(np.log10(self._array_gamma[msk_nan]), np.log10(self._array_n_gamma[msk_nan]), bounds_error=False, kind='linear') y = np.power(10., f_interp(np.log10(gamma))) msk_nan = np.isnan(y) y[msk_nan] = 0 else: y = np.zeros(gamma.size) return y def _activate_numba(self): pass
[docs] class InjEmittersDistribution(BaseEmittersDistribution): """Injection emitter distribution used in time/equilibrium calculations. Notes ----- Manages ``Q(gamma)``-style injection parameterizations, normalization, and optional synchronization of injected particles with equilibrium-enabled backend buffers. """ def __init__(self, name, spectral_type, gamma_grid_size=200, log_values=False, skip_build=False, normalize=True, emitters_type='electrons'): """Create a new `InjEmittersDistribution` instance. Parameters ---------- name : object Name identifier. spectral_type : object Spectral-shape identifier. gamma_grid_size : int, optional Number of points in the gamma grid. log_values : bool, optional If ``True``, parameters are represented in log10 scale. skip_build : bool, optional If ``True``, skip build. normalize : bool, optional If ``True``, normalize the distribution before scaling. emitters_type : str, optional Emitter population type. """ super(InjEmittersDistribution, self).__init__(name=name, spectral_type=spectral_type, emitters_type='electrons', gamma_grid_size=gamma_grid_size, skip_build=True, normalize=True, log_values=log_values) if skip_build is False: self._build(name, log_values, gamma_grid_size, normalize=True) def _build(self, name, log_values, gamma_grid_size, normalize): self._user_defined = True self._name = name self._log_values = log_values self.parameters = ModelParameterArray() self.set_parameters_dict() self.parameters.add_par( ModelParameter(name='Q', par_type='emitters_density', val=1E-3, val_min=0, val_max=None, units='cm-3 s-1')) self._parameters_dict['Q'] = JetModelDictionaryPar(ptype='emitters_density', vmin=0, vmax=None, punit='cm-3 s-1', val=1E-3, log=False, froz=False, is_in_jetkernel=False) self.normalize = normalize self.gamma_cooling_eq = None self.gamma_cooling_eq_second=None self._primaries_done = False self._secondaries_done=False self._gamma_grid_size = gamma_grid_size self._reset_equilibrium_state() #def _set_base_pars(self): # # # self.parameters.add_par( JetModelDictionaryPar(name='Q', par_type='emitters_density', val=1E-3, val_min=0, val_max=None, units='cm-3 s-1'))
[docs] def set_parameters_dict(self,): """Build default parameter dictionary for injection distributions. Notes ----- Populates both ``_parameters_dict`` and ``parameters`` container. """ model_dict, a_h, b_h, a_l, b_l, a_t, b_t = self._get_base_dict_and_bounds() model_dict['gmin'].val = 2.0 model_dict['gmin'].is_in_jetkernel = True model_dict['gmax'].val = 1E6 model_dict['gmax'].is_in_jetkernel = True self._parameters_dict=model_dict for k,par in model_dict.items(): if par.log is True: val = np.log10(par.val) else: val = par.val self.parameters.add_par(ModelParameter(name=k, par_type=par.ptype, val=val, val_min=par.vmin, val_max=par.vmax, units=par.punit, frozen=par.froz, log=par.log))
def _get_base_dict_and_bounds(self): model_dic = {} a_h, b_h = self.set_bounds(1, 1E15, log_val=self._log_values) a_l, b_l = self.set_bounds(1, 1E9, log_val=self._log_values) a_t, b_t = self.set_bounds(1, 1E9, log_val=self._log_values) model_dic['gmin'] = JetModelDictionaryPar(ptype='low-energy-cut-off', vmin=a_l, vmax=b_l, punit='lorentz-factor', log=self._log_values, jetkernel_par_name='emitters.gmin') model_dic['gmax'] = JetModelDictionaryPar(ptype='high-energy-cut-off', vmin=a_h, vmax=b_h, punit='lorentz-factor', log=self._log_values, jetkernel_par_name='emitters.gmax') return model_dic, a_h, b_h, a_l, b_l, a_t, b_t def _fill(self,): self.set_grid() self.f=self._eval_func(gamma=self._gamma_grid) self._Norm=1.0 if self.normalize is True: _integ = np.trapezoid(self.f,self._gamma_grid) if _integ > 0: self._Norm=1.0/_integ self.f = self.f*self._Norm*self.parameters.get_par_by_name('Q').val self.gamma_e = self._gamma_grid.copy() self.n_gamma_e = np.asarray(self.f, dtype=np.float64) self.n_gamma_e[np.isnan(self.n_gamma_e)] = 0 self.n_gamma_e[np.isinf(self.n_gamma_e)] = 0 if hasattr(self, '_jet') and self._jet is not None: if getattr(self._jet._blob.emitters, 'do_equilibrium', 0) == 1: size = self._gamma_grid_size q_ptr = get_nested_attr(self._jet._blob, 'emitters.Q_inj_e_primaries') set_emitters(q_ptr,self._jet._blob,size,self.n_gamma_e)
[docs] def eval_U_q(self): """Evaluate u q. Returns ------- object Computed value. """ self._fill() x=self.gamma_e y=self.f*x cost=m_e.cgs.value * (c.cgs ** 2).value return np.trapezoid(y,x) * cost
def _set_L_inj(self, L_inj_target_erg, volume): if L_inj_target_erg >0: self.parameters.Q.val *=L_inj_target_erg/(self.eval_U_q() * volume ) #NOTE: commented until is fixed the setting to zero of Ne_jetset for protons #self.update() #NOTE: not used, to be removed
[docs] def set_temp_ev(self): """Bind temporary-evolution buffers used by legacy workflows. Notes ----- Marked as legacy in code comments and kept for compatibility. """ self.e_gamma_ptr = getattr(self._temp_ev, self._gammae_name) self._Q_inj_e_second_ptr = get_nested_attr(self._temp_ev._blob, self._Q_inj_e_second_name)
[docs] def add_par(self, name, par_type, val, vmax, vmin, unit='', log=False, frozen=False): """Add par. Parameters ---------- name : object Name identifier. par_type : object Parameter type/category label. val : object Value to assign. vmax : object Maximum allowed value. vmin : object Minimum allowed value. unit : str, optional Physical unit string. log : bool, optional If ``True``, treat the value in log10 units. frozen : bool, optional If ``True``, keep parameter fixed during fitting. """ if name not in self._parameters_dict.keys(): self._parameters_dict[name]=JetModelDictionaryPar(ptype=par_type, vmin=vmin, vmax=vmax, log=log, val=val, punit=unit, froz=frozen, is_in_jetkernel=False) else: raise ValueError('par',name,'already assigned') self.parameters.add_par(ModelParameter(name=name, par_type=par_type, val=val, val_min=vmin, val_max=vmax, units=unit, frozen=frozen, log=log,))
[docs] class InjEmittersArrayDistribution(InjEmittersDistribution): """Array-based injection distribution ``Q(gamma)`` implementation. Notes ----- Uses tabulated gamma and injection-density arrays, interpolates them onto internal grids, and keeps compatibility with injection-distribution APIs. """ def __init__(self, name, emitters_type='electrons', normalize=False, skip_build=False, gamma_array=None, n_gamma_array=None, gamma_grid_size=None): """Create a new `InjEmittersArrayDistribution` instance. Parameters ---------- name : object Name identifier. emitters_type : str, optional Emitter population type. normalize : bool, optional If ``True``, normalize the distribution before scaling. skip_build : bool, optional If ``True``, skip build. gamma_array : object, optional Gamma grid values for tabulated distributions. n_gamma_array : object, optional Distribution values corresponding to ``gamma_array``. gamma_grid_size : object, optional Number of points in the gamma grid. """ super(InjEmittersArrayDistribution, self).__init__(name, spectral_type='array', emitters_type=emitters_type) if gamma_array is not None and n_gamma_array is not None: if self._spectral_type != 'array': raise RuntimeError('you can pass gamma_array and n_gamma_array only for array distribution') self._set_arrays(gamma_array,n_gamma_array) elif (gamma_array is not None) and (n_gamma_array is not None) is False: raise RuntimeError('you have to pass both gamma_array and n_gamma_array for array distribution') if gamma_grid_size is None: gamma_grid_size=gamma_array.size if skip_build is False: self._build( name, False, gamma_grid_size, normalize) self.parameters.gmin.val=self._array_gamma[0] self.parameters.gmax.val = self._array_gamma[-1] self.parameters.Q.val =1 self.set_distr_func(self._array_func) def _set_arrays(self,gamma_array,n_gamma_array): _ids = np.argsort(gamma_array) self._array_gamma = gamma_array[_ids] self._array_n_gamma = n_gamma_array[_ids] def _array_func(self,gamma): msk_nan=self._array_n_gamma>0 f_interp = interpolate.interp1d(np.log10(self._array_gamma[msk_nan]), np.log10(self._array_n_gamma[msk_nan]), bounds_error=False, kind='linear') y = np.power(10., f_interp(np.log10(gamma))) msk_nan = np.isnan(y) y[msk_nan] = 0 return y def _activate_numba(self): pass