__author__ = "Andrea Tramacere"
import json
import dill as pickle
import six
import numpy as np
import copy
import warnings
import os
from astropy import units as u
from astropy import constants
from contextlib import redirect_stdout
import io
import multiprocessing
from .jet_spectral_components import JetSpecComponent, SpecCompList
from .model_parameters import ModelParameterArray, ModelParameter, _show_table
from .base_model import Model
from .output import makedir,WorkPlace
from .plot_sedfit import PlotSED,plt
from .cosmo_tools import Cosmo
from .utils import safe_run,set_str_attr, old_model_warning, get_info, clean_var_name
from .jet_paramters import *
from .jet_emitters import *
from .jet_emitters_factory import EmittersFactory
from .jet_tools import *
from .mathkernel_helper import bessel_table_file_path
on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
if on_rtd is True:
try:
from .jetkernel import jetkernel as BlazarSED
except ImportError:
from .mock import jetkernel as BlazarSED
else:
from .jetkernel import jetkernel as BlazarSED
__all__=['Jet','JetBase','GalacticBeamed','GalacticUnbeamed']
[docs]
class JetBase(Model):
""" JetBase class.
This is the base class for the jet models providing the interface to the C code, giving full access to the physical parameters and
providing the methods to run the code.
The object will store the the physical parameters in the ::py:attr:`Jet.parameters` which is :class:`.ModelParameterArray` object,
i.e. a collection of :class:`JetParameter` objects. All the physical parameters are also accessible as attributes of
the ::py:attr:`Jet.parameters`
"""
def __repr__(self):
return str(self.show_model())
def __init__(self,
cosmo=None,
name='test',
emitters_type='electrons',
emitters_distribution='pl',
emitters_distribution_log_values=False,
beaming_expr='delta',
jet_workplace=None,
verbose=None,
nu_size=500,
clean_work_dir=True,
geometry='spherical',
**keywords):
"""
Parameters
----------
cosmo : _type_, optional
instance of the`Cosmo` class, by default None
name : str, optional
name of the model, by default 'test'
emitters_type : str, optional
, by default 'electrons'
emitters_distribution : str, optional
, by default 'pl'
emitters_distribution_log_values : bool, optional
, by default False
beaming_expr : str, optional
expression for the beaming , by default 'delta'
jet_workplace : _type_, optional
, by default None
verbose : _type_, optional
, by default None
nu_size : int, optional
size of the nu grid, by default 500
clean_work_dir : bool, optional
, by default True
geometry : str, optional
, by default 'spherical'
"""
super(JetBase,self).__init__( cosmo=cosmo, **keywords)
self.name = clean_var_name(name)
self.model_type='jet'
#self._emitters_type=emitters_type
self._scale='lin-lin'
self._blob = self._build_blob(verbose=verbose)
self._static_spec_arr_grid_size = BlazarSED.static_spec_arr_grid_size
self._nu_static_size = BlazarSED.static_spec_arr_size
self.nu_size = nu_size
self.nu_grid_size=self._get_nu_grid_size_blob()
N=multiprocessing.cpu_count()
if N>2:
N=min(N,20)
self.set_num_c_threads(N=N)
if jet_workplace is None:
jet_workplace=WorkPlace()
out_dir= jet_workplace.out_dir + '/' + self.name + '_jet_prod/'
else:
out_dir=jet_workplace.out_dir+'/'+self.name+'_jet_prod/'
self._allowed_geometry=['spherical','spherical_shell']
self.geometry=geometry
self.set_path(out_dir,clean_work_dir=clean_work_dir)
self.set_flag(self.name)
self._allowed_EC_components_list=['EC_BLR',
'DT',
'EC_DT',
'Star',
'EC_Star',
#CMB',
'EC_CMB',
'Disk',
'Disk_MultiBB',
'Disk_Mono',
'EC_Disk',
'All']
self._allwed_disk_type =['BB', 'Mono', 'MultiBB']
self.EC_components_list =[]
self._spectral_components_list=[]
self._hidden_spectral_components_list = []
self.spectral_components= SpecCompList(self._spectral_components_list)
self.add_basic_components()
self.SED=self.get_spectral_component_by_name('Sum').SED
self.parameters = JetModelParameterArray(model=self)
self._emitting_region_dict = None
self._electron_distribution_dic= None
self._external_photon_fields_dic= None
self._original_emitters_distr = None
self._energetic = None
self._setup(emitters_distribution,emitters_distribution_log_values,beaming_expr,emitters_type)
def _setup(self, emitters_distribution, emitters_distribution_log_values, beaming_expr, emitters_type):
"""_summary_
Parameters
----------
emitters_distribution : _type_
_description_
emitters_distribution_log_values : _type_
_description_
beaming_expr : _type_
_description_
emitters_type : _type_
_description_
"""
self.EC_components_list = []
self._spectral_components_list = []
self.spectral_components = SpecCompList(self._spectral_components_list)
self.add_basic_components()
self.SED = self.get_spectral_component_by_name('Sum').SED
self.parameters = JetModelParameterArray(model=self)
self._emitting_region_dict = None
self._emitters_distribution_dic = None
self._external_photon_fields_dic = None
self._blob.IC_adaptive_e_binning = 0
self._blob.do_IC_down_scattering = 0
if hasattr(emitters_distribution,'emitters_type'):
emitters_type=emitters_distribution.emitters_type
self.set_emitting_region(beaming_expr,emitters_type)
self.flux_plot_lim=1E-30
self.set_emiss_lim(1E-120)
self._IC_states = {}
self._IC_states['on'] = 1
self._IC_states['off'] = 0
self._external_field_transf = {}
self._external_field_transf['blob'] = 0
self._external_field_transf['disk'] = 1
self._jetkernel_interp='linear'
self.set_emitters_distribution(emitters_distribution, emitters_distribution_log_values, emitters_type,
init=False)
self.set_blob()
def __getstate__(self):
j= self._serialize_model()
return j
def __setstate__(self,state):
self.__init__()
self._decode_model(state)
self._fix_par_dep_on_load()
def _serialize_model(self):
_model = {}
_model['version']=get_info()['version']
_model['name'] = self.name
_model['emitters_type'] = self.emitters_distribution.emitters_type
if isinstance(self.emitters_distribution,JetkernelEmittersDistribution):
_model['emitters_distribution'] = self._emitters_distribution_name
_model['emitters_distribution_log_values'] = self._emitters_distribution_log_values
_model['emitters_distribution_class']='JetkernelEmittersDistribution'
elif isinstance(self.emitters_distribution,EmittersDistribution):
self._original_emitters_distr._copy_from_jet(self)
_model['custom_emitters_distribution']=self._original_emitters_distr
clean_numba(_model['custom_emitters_distribution'])
_model['emitters_distribution_class'] = 'EmittersDistribution'
else:
raise RuntimeError('emitters distribution type not valid',type(self._emitters_distribution))
if hasattr(self,'T_esc_e_second'):
_model['T_esc_e_second']=self.T_esc_e_second
if hasattr(self,'geometry'):
_model['geometry']=self.geometry
_model['beaming_expr'] = self._beaming_expr
_model['spectral_components_name'] = self.get_spectral_component_names_list()
_model['spectral_components_state'] = [c.state for c in self._spectral_components_list]
_model['EC_components_name'] = self.EC_components_list
_model['basic_components_name'] = self.basic_components_list
_model['cosmo'] = self.cosmo._serialize_model()
_model['pars'] = {}
_model['pars']=self.parameters._serialize_pars()
_model['external_field_transf']=self.get_external_field_transf()
_model['internal_pars'] = {}
_model['internal_pars']['nu_size'] = self.nu_size
_model['internal_pars']['nu_seed_size'] = self.nu_seed_size
_model['internal_pars']['gamma_grid_size'] = self.gamma_grid_size
_model['internal_pars']['IC_nu_size']=self.IC_nu_size
_model['internal_pars']['nu_min']=self.nu_min
_model['internal_pars']['nu_max']=self.nu_max
_model['internal_pars']['Norm_distr'] = self.Norm_distr
return _model
[docs]
def save_model(self,file_name):
pickle.dump(self, open(file_name, 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
@classmethod
@safe_run
def load_model(cls, file_name):
"""Load a save model
Parameters
----------
file_name : _type_
_description_
verbose : bool, optional
_description_, by default True
Returns
-------
_type_
_description_
Raises
------
RuntimeError
_description_
"""
try:
jet=pickle.load(open(file_name, "rb"))
jet.set_blob()
jet._update_spectral_components()
return jet
except Exception as e:
raise RuntimeError('The model you loaded is not valid please check the file name', e)
def _decode_model(self,_model):
if 'version' in _model.keys():
self._set_version(_model['version'])
else:
self._set_version(v='unknown(<1.1.2)')
self.cosmo = Cosmo.from_model(_model['cosmo'])
self.model_type = 'jet'
self.name = _model['name']
if 'geometry' in _model.keys():
self.geometry=_model['geometry']
else:
warnings.warn('the model you are loading has not geometry specification. Assuming it is spherical!')
self.geometry='spherical'
self.set_blob()
self.parameters = JetModelParameterArray(model=self)
if 'emitters_type' in _model.keys():
emitters_type=str(_model['emitters_type'])
else:
emitters_type = 'electrons'
if 'electron_distribution' in _model.keys():
_v=_model['electron_distribution']
del(_model['electron_distribution'])
_model['emitters_distribution']=_v
if 'electron_distribution_log_values' in _model.keys():
_v=_model['electron_distribution_log_values']
del(_model['electron_distribution_log_values'])
_model['emitters_distribution_log_values']=_v
if _model['emitters_distribution_class'] == 'JetkernelEmittersDistribution':
self.set_emitters_distribution(distr=_model['emitters_distribution'],
log_values=_model['emitters_distribution_log_values'],
emitters_type=emitters_type,
init=False)
elif _model['emitters_distribution_class'] == 'EmittersDistribution':
self.set_emitters_distribution(distr=_model['custom_emitters_distribution'], init=False)
else:
raise RuntimeError('emitters distribution type not valid', type(self._emitters_distribution))
for c in self.basic_components_list:
if c not in _model['basic_components_name']:
self.del_spectral_component(c)
if self.emitters_distribution.emitters_type == 'protons':
self.add_pp_gamma_component()
self.add_pp_neutrino_component()
self.add_bremss_ep_component()
self.T_esc_e_second=_model['T_esc_e_second']
if 'disk_type' in _model['pars'].keys():
disk_type=_model['pars']['disk_type']['val']
else:
disk_type=None
self.add_EC_component(_model['EC_components_name'],disk_type=disk_type)
self.set_external_field_transf(_model['external_field_transf'])
for ID, c in enumerate(_model['spectral_components_name']):
comp = getattr(self.spectral_components, c)
if comp._state_dict != {}:
comp.state = _model['spectral_components_state'][ID]
self.SED = self.get_spectral_component_by_name('Sum').SED
self.set_emitting_region(str(_model['beaming_expr']),self.emitters_distribution.emitters_type)
if isinstance(self,GalacticBeamed):
self._handle_z(d=self.cosmo._DL_cm)
_par_dict = _model['pars']
_non_user_dict={}
_user_dict={}
for k, v in _model['pars'].items():
if v['par_type'] == 'user_defined':
_user_dict[k]=v
else:
_non_user_dict[k]=v
self.parameters._decode_pars(_non_user_dict)
for k, v in _user_dict.items():
v['name']=k
self.parameters.add_par(ModelParameter(**v))
_par_dict = _model['internal_pars']
for k in _par_dict.keys():
setattr(self,k,_par_dict[str(k)])
def _build_blob(self, verbose=None):
blob = BlazarSED.MakeBlob()
blob.x_Bessel_min = 1E-17
blob.x_Bessel_max = 7.2E2
blob.x_ave_Bessel_min = 1E-16
blob.x_ave_Bessel_max = 3.5E2
blob.log_x_Bessel_min = np.log10( blob.x_Bessel_min)
blob.log_x_Bessel_max = np.log10( blob.x_Bessel_max)
blob.log_x_ave_Bessel_min = np.log10( blob.x_ave_Bessel_min)
blob.log_x_ave_Bessel_max = np.log10( blob.x_ave_Bessel_max)
F_Sync_x_ptr = getattr(blob, 'F_Sync_x')
F_Sync_y_ptr = getattr(blob, 'F_Sync_y')
G_Sync_x_ptr = getattr(blob, 'G_Sync_x')
G_Sync_y_ptr = getattr(blob, 'G_Sync_y')
F_ave_Sync_x_ptr = getattr(blob, 'F_ave_Sync_x')
F_ave_Sync_y_ptr = getattr(blob, 'F_ave_Sync_y')
log_F_Sync_x_ptr = getattr(blob, 'log_F_Sync_x')
log_F_Sync_y_ptr = getattr(blob, 'log_F_Sync_y')
log_F_ave_Sync_x_ptr = getattr(blob, 'log_F_ave_Sync_x')
log_F_ave_Sync_y_ptr = getattr(blob, 'log_F_ave_Sync_y')
log_G_Sync_x_ptr = getattr(blob, 'log_G_Sync_x')
log_G_Sync_y_ptr = getattr(blob, 'log_G_Sync_y')
d = np.genfromtxt(bessel_table_file_path,comments='#')
log_F_Sync_x=np.log10(d[:,0])
log_F_Sync_y=np.log10(d[:,1])
log_F_ave_Sync_x=np.log10(d[:,2])
log_F_ave_Sync_y=np.log10(d[:,3])
log_G_Sync_x=np.log10(d[:,4])
log_G_Sync_y=np.log10(d[:,5])
for ID, l in enumerate(d):
BlazarSED.set_bessel_table(F_Sync_x_ptr,blob, l[0], ID)
BlazarSED.set_bessel_table(F_Sync_y_ptr, blob, l[1], ID)
BlazarSED.set_bessel_table(F_ave_Sync_x_ptr, blob, l[2], ID)
BlazarSED.set_bessel_table(F_ave_Sync_y_ptr, blob, l[3], ID)
BlazarSED.set_bessel_table(G_Sync_x_ptr,blob, l[4], ID)
BlazarSED.set_bessel_table(G_Sync_y_ptr, blob, l[5], ID)
BlazarSED.set_bessel_table(log_F_Sync_x_ptr, blob, log_F_Sync_x[ID], ID)
BlazarSED.set_bessel_table(log_F_Sync_y_ptr, blob, log_F_Sync_y[ID], ID)
BlazarSED.set_bessel_table(log_F_ave_Sync_x_ptr, blob, log_F_ave_Sync_x[ID], ID)
BlazarSED.set_bessel_table(log_F_ave_Sync_y_ptr, blob, log_F_ave_Sync_y[ID], ID)
BlazarSED.set_bessel_table(log_G_Sync_x_ptr, blob, log_G_Sync_x[ID], ID)
BlazarSED.set_bessel_table(log_G_Sync_y_ptr, blob, log_G_Sync_y[ID], ID)
blob.BESSEL_TABLE_DONE=1
if verbose is None:
blob.verbose = 0
else:
blob.verbose = verbose
set_str_attr(blob, 'path', './')
set_str_attr(blob, 'MODE', 'custom')
blob.gamma_grid_size = 200
blob.nu_IC_size = 100
blob.nu_seed_size = 100
blob.nu_grid_size= 1000
blob.do_Sync = 2
blob.do_SSC = 1
blob.R = 5.0e15
blob.h_sh=0.1
blob.B = 0.1
blob.z_cosm = 0.1
blob.BulkFactor = 10
blob.theta = 0.1
blob.N = 100
blob.NH_pp = 1
blob.NH_cold_to_rel_e = 1.0
blob.L_Disk = 1E45
blob.L_DT = 1E45
blob.gmin = 2
blob.gmax = 1e6
blob.nu_start_Sync = 1e6
blob.nu_stop_Sync = 1e20
blob.nu_start_SSC = 1e14
blob.nu_stop_SSC = 1e30
blob.nu_start_grid = 1e6
blob.nu_stop_grid = 1e30
return blob
[docs]
def set_emitting_region(self,beaming_expr,emitters_type):
"""sets the emitting region
Parameters
----------
beaming_expr : str
emitters_type : str
"""
if self._emitting_region_dict is not None:
self.del_par_from_dic(self._emitting_region_dict)
self._beaming_expr=beaming_expr
set_str_attr(self._blob,'BEAMING_EXPR',beaming_expr)
self._emitting_region_dict=build_emitting_region_dict(self.cosmo,beaming_expr=beaming_expr,emitters_type=emitters_type)
self._set_geometry()
self.parameters.add_par_from_dict(self._emitting_region_dict,self,'_blob',JetParameter)
def _set_geometry(self):
self._blob.GEOMETRY=self.geometry
if self.geometry == 'spherical':
pass
elif self.geometry == 'spherical_shell':
self._emitting_region_dict.pop('R')
self._emitting_region_dict['R_sh'] = JetModelDictionaryPar(ptype='region_size', vmin=1E3, vmax=1E30, punit='cm', froz=False, log=False)
self._emitting_region_dict['h_sh'] = JetModelDictionaryPar(ptype='scaling_factor',val=0.1, vmin=0, punit='', vmax=1, froz=False, log=False)
@property
def spectral_components_list(self):
"""provides a list of the spectral components
Returns
-------
list of spectral components
"""
return [s for s in self._spectral_components_list if s.hidden is False]
@property
def geometry(self,):
return self._geometry
@geometry.setter
def geometry(self,geometry):
if geometry in self._allowed_geometry:
self._geometry=geometry
else:
raise RuntimeError("geometry %s not allowed" % geometry, "please choose among ",
self._allowed_geometry)
[docs]
def make_conical_jet(self, R=None,theta_open=5.,theta_open_min=1,theta_open_max=10):
"""Convenience method to set functional dependency of parameters to have conical jet
Parameters
----------
R : double, optional
, by default None
theta_open : double, optional
semi opening angle of the jet in deg, by default 5
theta_open_min : int, optional
min value for theta_open, by default 1
theta_open_max : int, optional
max value for theta_open, by default 10
Raises
------
RuntimeError
"""
if R is None:
R=self.parameters.R.val
try:
self.add_user_par(name='theta_open',val=theta_open,units='deg',val_min=theta_open_min,val_max=theta_open_max)
self.make_dependent_par(par='R', depends_on=['R_H','theta_open'],
par_expr='np.tan(np.radians(theta_open))*R_H')
self.parameters.R_H.free()
R_H=R/(np.tan(np.radians(self.parameters.theta_open.val)))
print('setting R_H to',R_H)
self.parameters.R_H.val=R_H
except RuntimeError as e:
print('functional dependencies already invoked',str(e))
except Exception as e:
raise RuntimeError('unexpected exception',str(e))
[docs]
def set_EC_dependencies(self):
"""Convenience method to set the functional dependency of BLR and DT radius according to the disk luminosity
"""
try:
if self.parameters.get_par_by_name('L_Disk') is not None and self.parameters.get_par_by_name('R_BLR_in') is not None:
try:
self.make_dependent_par(par='R_BLR_in', depends_on=['L_Disk'], par_expr='3E17*(L_Disk/1E46)**0.5')
self.make_dependent_par(par='R_BLR_out', depends_on=['R_BLR_in'], par_expr='R_BLR_in*1.1')
except RuntimeError as e:
print('functional dependencies already invoked for BLR',str(e))
if self.parameters.get_par_by_name('L_Disk') is not None and self.parameters.get_par_by_name('R_DT') is not None:
try:
self.make_dependent_par(par='R_DT', depends_on=['L_Disk'], par_expr='2E19*(L_Disk/1E46)**0.5')
except RuntimeError as e:
print('functional dependencies already invoked for DT',str(e))
except Exception as e:
print('unexpected exception',str(e))
@property
def IC_adaptive_e_binning(self,):
return np.intc(self._blob.IC_adaptive_e_binning)
@IC_adaptive_e_binning.setter
def IC_adaptive_e_binning(self,state):
if type(state) == bool:
pass
else:
raise RuntimeError('state has to be boolean')
self._blob.IC_adaptive_e_binning=np.intc(state)
[docs]
@staticmethod
def available_emitters_distributions():
EmittersFactory.available_distributions()
[docs]
def set_emitters_distribution(self, distr=None, log_values=False, emitters_type='electrons', init=True):
if init is True:
self.set_blob()
self._emitters_distribution_log_values = log_values
if self._emitters_distribution_dic is not None:
self.del_par_from_dic(self._emitters_distribution_dic)
if isinstance(distr, ArrayDistribution):
self._emitters_distribution_name = 'from_array'
self.emitters_distribution = JetkernelEmittersDistribution.from_array(self, distr, emitters_type=emitters_type)
elif isinstance(distr, JetkernelEmittersDistribution):
self.emitters_distribution = distr
self._emitters_distribution_name = self.emitters_distribution.name
self._emitters_distribution_dic = self.emitters_distribution._parameters_dict
self.parameters.add_par_from_dict(self._emitters_distribution_dic,self,'_blob',JetParameter)
elif isinstance(distr, EmittersDistribution):
if hasattr(distr,'_activate_numba'):
distr._activate_numba()
self._original_emitters_distr = distr
self.emitters_distribution = copy.deepcopy(distr)
self._update_emitters_pars_dependence()
self.emitters_distribution.set_jet(self)
self.emitters_distribution._update_parameters_dict()
self._emitters_distribution_name = self.emitters_distribution.name
self._emitters_distribution_dic = self.emitters_distribution._parameters_dict
self.parameters.add_par_from_dict(self._emitters_distribution_dic, self, '_blob', JetParameter)
self._attach_pars_to_jet(preserve_value_emitters=True)
self.emitters_distribution.update()
elif isinstance(distr, str):
nf=EmittersFactory()
self.emitters_distribution = nf.create_emitters(distr, log_values=log_values, emitters_type=emitters_type)
if hasattr( self.emitters_distribution,'_activate_numba'):
self.emitters_distribution._activate_numba()
self._original_emitters_distr = copy.deepcopy(self.emitters_distribution)
self.emitters_distribution.set_jet(self)
self.emitters_distribution._update_parameters_dict()
self._emitters_distribution_name = self.emitters_distribution.name
self._emitters_distribution_dic = self.emitters_distribution._parameters_dict
self.parameters.add_par_from_dict(self._emitters_distribution_dic, self, '_blob', JetParameter)
self._attach_pars_to_jet(preserve_value_emitters=True)
self._update_emitters_pars_dependence()
self.emitters_distribution.update()
else:
raise RuntimeError('distr',type(distr),'not valid should be a string or an',type(EmittersDistribution),'instance')
def _attach_pars_to_jet(self, preserve_value_emitters=False):
v_dict={}
for par in self.emitters_distribution.parameters.par_array[::]:
self.emitters_distribution.parameters.del_par(par)
v_dict[par.name]=par.val
for k in self._emitters_distribution_dic.keys():
par = self.parameters.get_par_by_name(k)
if preserve_value_emitters is True:
par.set(val=v_dict[k],skip_dep_par_warning=True)
self.emitters_distribution.parameters.add_par(par)
def _update_emitters_pars_dependence(self):
for par in self.emitters_distribution.parameters.par_array:
if par.immutable is True:
warnings.warn('parameter dependence has to be reassigned after emitters distribution is assigned to a jet, now will be reset no dep')
self.emitters_distribution.parameters.reset_dependencies()
break
[docs]
def get_emitters_distribution_name(self):
return self.emitters_distribution.name
[docs]
def show_spectral_components(self):
print ("Spectral components for Jet model:%s"%(self.name))
for comp in self._spectral_components_list:
print ("comp: %s "%(comp.name))
print()
[docs]
def sed_table(self, restframe='obs'):
"""Provide the astropy table with SED spectral components
Parameters
----------
restframe : str, optional
, by default 'obs'
Returns
-------
astropy table
"""
try:
self.spectral_components.build_table(restframe=restframe)
except:
self.eval()
self._SED_table = self.spectral_components.table
return self._SED_table
[docs]
def get_spectral_component_by_name(self,name,verbose=True):
for i in range(len(self._spectral_components_list)):
if self._spectral_components_list[i].name==name:
return self._spectral_components_list[i]
else:
if verbose==True:
print ("no spectral components with name %s found"%name)
print ("names in array are:")
self.show_spectral_components()
return None
[docs]
def list_spectral_components(self):
for i in range(len(self._spectral_components_list)):
print (self._spectral_components_list[i].name)
[docs]
def get_spectral_component_names_list(self):
_l=[]
for i in range(len(self._spectral_components_list)):
_l.append(self._spectral_components_list[i].name)
return _l
[docs]
def del_spectral_component(self,name):
print('deleting spectral component', name)
if name in self.EC_components_list:
self.del_EC_component(name)
else:
self._del_spectral_component(name)
if name in self.basic_components_list:
self.basic_components_list.remove(name)
def _del_spectral_component(self, name, verbose=True):
comp=self.get_spectral_component_by_name(name,verbose=verbose)
if comp._state_dict!={}:
comp.state='off'
if comp is not None:
self._spectral_components_list.remove(comp)
self.spectral_components.__delattr__(name)
def _add_spectral_component(self, name, var_name=None, state_dict=None,state=None):
self._spectral_components_list.append(
JetSpecComponent(self, name, self._blob, var_name=var_name, state_dict=state_dict, state=state))
setattr(self.spectral_components,name,self._spectral_components_list[-1])
def _update_spectral_components(self):
for ID,s in enumerate(self._spectral_components_list):
self._spectral_components_list[ID]= JetSpecComponent(self, s.name, self._blob, var_name=s._var_name, state_dict=s._state_dict, state=s.state)
self._spectral_components_list[ID].hidden = s.hidden
setattr(self.spectral_components, self._spectral_components_list[ID].name, self._spectral_components_list[ID])
[docs]
def add_basic_components(self):
self.basic_components_list=['Sum','Sync','SSC']
self._add_spectral_component('Sum')
self._add_spectral_component('Sync', var_name='do_Sync', state_dict=dict((('on', 1), ('off', 0), ('self-abs', 2))),state='self-abs')
self._add_spectral_component('SSC', var_name='do_SSC', state_dict=dict((('on', 1), ('off', 0))))
[docs]
def add_sync_component(self,state='self-abs'):
self._add_spectral_component('Sync', var_name='do_Sync',
state_dict=dict((('on', 1), ('off', 0), ('self-abs', 2))), state=state)
[docs]
def add_SSC_component(self,state='on'):
self._add_spectral_component('SSC', var_name='do_SSC', state_dict=dict((('on', 1), ('off', 0))),state=state)
[docs]
def del_EC_component(self,EC_components_list, disk_type='BB'):
"""Remove EC components
Parameters
----------
EC_components_list : list
list of the components to remove
disk_type : str, optional
by default 'BB'
Raises
------
RuntimeError
_description_
"""
if isinstance(EC_components_list, six.string_types):
EC_components_list = [EC_components_list]
if 'All' in EC_components_list:
EC_components_list=self._allowed_EC_components_list[::]
for EC_component in EC_components_list:
if EC_component not in self._allowed_EC_components_list:
raise RuntimeError("EC_component %s not allowed" % EC_component, "please choose among ",
self._allowed_EC_components_list)
if EC_component=='Disk':
if self.get_spectral_component_by_name('Disk', verbose=False) is not None:
self._del_spectral_component('Disk', verbose=False)
self.EC_components_list.remove('Disk')
if self.get_spectral_component_by_name('EC_Disk',verbose=False) is not None:
self._del_spectral_component('EC_Disk')
self._blob.do_EC_Disk = 0
self.EC_components_list.remove('EC_Disk')
if self.get_spectral_component_by_name('EC_BLR', verbose=False) is not None:
self._blob.do_EC_BLR=0
self._del_spectral_component('EC_BLR', verbose=False)
self.EC_components_list.remove('EC_BLR')
if EC_component=='EC_Disk':
if self.get_spectral_component_by_name('EC_Disk', verbose=False) is not None:
self._blob.do_EC_Disk=0
self._del_spectral_component('EC_Disk', verbose=False)
self.EC_components_list.remove('EC_Disk')
if EC_component=='EC_BLR':
if self.get_spectral_component_by_name('EC_BLR', verbose=False) is not None:
self._blob.do_EC_BLR=0
self._del_spectral_component('EC_BLR', verbose=False)
self.EC_components_list.remove('EC_BLR')
if EC_component=='DT':
if self.get_spectral_component_by_name('DT', verbose=False) is not None:
self._del_spectral_component('DT', verbose=False)
self.EC_components_list.remove('DT')
if self.get_spectral_component_by_name('EC_DT', verbose=False) is not None:
self._blob.do_EC_DT = 0
self._del_spectral_component('EC_DT', verbose=False)
self.EC_components_list.remove('EC_DT')
if EC_component=='EC_DT':
if self.get_spectral_component_by_name('EC_DT', verbose=False) is not None:
self._blob.do_EC_DT=0
self._del_spectral_component('EC_DT', verbose=False)
self.EC_components_list.remove('EC_DT')
if EC_component=='EC_CMB':
if self.get_spectral_component_by_name('EC_CMB', verbose=False) is not None:
self._blob.do_EC_CMB=0
self._del_spectral_component('EC_CMB', verbose=False)
self.EC_components_list.remove('EC_CMB')
if EC_component=='Star':
if self.get_spectral_component_by_name('Star', verbose=False) is not None:
self._blob.do_star=0
self._del_spectral_component('Star', verbose=False)
self.EC_components_list.remove('Star')
if EC_component=='EC_Star':
if self.get_spectral_component_by_name('EC_Star', verbose=False) is not None:
self._blob.do_EC_Satr=0
self._del_spectral_component('EC_Star', verbose=False)
self.EC_components_list.remove('EC_Star')
self.del_par_from_dic(build_ExtFields_dic(EC_components_list,disk_type))
[docs]
def add_EC_component(self,EC_components_list=[],disk_type=None):
"""Method to add external Compton components
Parameters
----------
EC_components_list : list, optional
list of components to add, by default []
disk_type : str, optional
the type of the disk, by default 'BB'
Raises
------
RuntimeError
_description_
RuntimeError
_description_
"""
if disk_type is not None:
if disk_type not in self._allwed_disk_type:
raise RuntimeError('disk type',disk_type,'not in allwowed', self._allwed_disk_type)
else:
if self.get_par_by_name('disk_type') is not None:
disk_type=self.get_par_by_name('disk_type').val
else:
disk_type='BB'
if isinstance(EC_components_list, six.string_types):
EC_components_list=[EC_components_list]
if 'All' in EC_components_list:
EC_components_list=self._allowed_EC_components_list[::]
for EC_component in EC_components_list:
if EC_component not in self._allowed_EC_components_list:
raise RuntimeError("EC_component %s not allowed" % EC_component, "please choose among ",
self._allowed_EC_components_list)
if EC_component == 'Disk':
if self.get_spectral_component_by_name('Disk',verbose=False) is None:
self._add_spectral_component('Disk',var_name='do_Disk', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('Disk')
if EC_component=='EC_Disk':
#self._blob.do_EC_Disk=1
if self.get_spectral_component_by_name('EC_Disk',verbose=False) is None:
self._add_spectral_component('EC_Disk', var_name='do_EC_Disk', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('EC_Disk')
if self.get_spectral_component_by_name('Disk',verbose=False) is None:
self._add_spectral_component('Disk',var_name='do_Disk', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('Disk')
if EC_component=='EC_BLR':
#self._blob.do_EC_BLR=1
if self.get_spectral_component_by_name('EC_BLR',verbose=False) is None:
self._add_spectral_component('EC_BLR', var_name='do_EC_BLR', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('EC_BLR')
if self.get_spectral_component_by_name('Disk',verbose=False) is None:
self._add_spectral_component('Disk',var_name='do_Disk', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('Disk')
if EC_component == 'DT':
if self.get_spectral_component_by_name('DT',verbose=False) is None:
self._add_spectral_component('DT',var_name='do_DT', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('DT')
if self.get_spectral_component_by_name('Disk',verbose=False) is None:
self._add_spectral_component('Disk',var_name='do_Disk', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('Disk')
if EC_component == 'Star':
if self.get_spectral_component_by_name('Star',verbose=False) is None:
self._add_spectral_component('Star',var_name='do_Star', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('Star')
if EC_component == 'EC_Star':
if self.get_spectral_component_by_name('Star',verbose=False) is None:
self._add_spectral_component('Star',var_name='do_Star', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('Star')
if self.get_spectral_component_by_name('EC_Star',verbose=False) is None:
self._add_spectral_component('EC_Star', var_name='do_EC_Star', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('EC_Star')
if EC_component=='EC_DT':
#self._blob.do_EC_DT=1
if self.get_spectral_component_by_name('EC_DT',verbose=False) is None:
self._add_spectral_component('EC_DT', var_name='do_EC_DT', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('EC_DT')
if self.get_spectral_component_by_name('DT',verbose=False) is None:
self._add_spectral_component('DT',var_name='do_DT', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('DT')
if self.get_spectral_component_by_name('Disk',verbose=False) is None:
self._add_spectral_component('Disk',var_name='do_Disk', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('Disk')
if EC_component=='EC_CMB':
#self._blob.do_EC_CMB=1
if self.get_spectral_component_by_name('EC_CMB',verbose=False) is None:
self._add_spectral_component('EC_CMB', var_name='do_EC_CMB', state_dict=dict((('on', 1), ('off', 0))))
self.EC_components_list.append('EC_CMB')
#IF disk_type is already a parameter it has to be updated here to make it effective
self.parameters.add_par_from_dict(build_ExtFields_dic(self.EC_components_list, disk_type),self,'_blob',JetParameter)
#print('--> disk_type',disk_type)
if disk_type is not None and 'Disk' in self.EC_components_list:
#remove the old disk type parameters
if self.parameters.get_par_by_name('disk_type') is not None:
self.del_par_from_dic(build_ExtFields_dic(['Disk'],self.parameters.get_par_by_name('disk_type').val))
else:
self.EC_components_list.append('Disk')
self.parameters.add_par_from_dict(build_ExtFields_dic(self.EC_components_list, disk_type),self,'_blob',JetParameter)
self.parameters.disk_type.val = disk_type
[docs]
def del_par_from_dic(self,model_dic):
"""
"""
for key in model_dic.keys():
par=self.parameters.get_par_by_name(key)
if par is not None:
self.parameters.del_par(par)
[docs]
def get_DL_cm(self,eval_model=False):
if eval_model is True:
self.set_blob()
if self.cosmo._c is not None:
return self.cosmo.get_DL_cm(self.parameters.z_cosm.val)
else:
return self.cosmo.get_DL_cm()
@safe_run
def get_beaming(self,):
BlazarSED.SetBeaming(self._blob)
return self._blob.beam_obj
[docs]
def set_flag(self,flag):
self._blob.STEM=flag
[docs]
def get_flag(self):
return self._blob.STEM
[docs]
def get_path(self):
return self._blob.path
[docs]
def set_path(self,path,clean_work_dir=True):
if path.endswith('/'):
pass
else:
path+='/'
set_str_attr(self._blob,'path',path)
makedir(path,clean_work_dir=clean_work_dir)
[docs]
def get_IC_mode(self):
return dict(map(reversed, self._IC_states.items()))[self._blob.do_IC]
[docs]
def set_external_field_transf(self,val):
if val not in self._external_field_transf.keys():
raise RuntimeError('val',val,'not in allowed values',self._external_field_transf.keys())
self._blob.EC_stat=self._external_field_transf[val]
self._blob.EC_stat_orig=self._external_field_transf[val]
[docs]
def get_external_field_transf(self):
return dict(map(reversed, self._external_field_transf.items()))[self._blob.EC_stat]
[docs]
def set_emiss_lim(self,val):
self._blob.emiss_lim=val
[docs]
def get_emiss_lim(self):
return self._blob.emiss_lim
@property
def IC_nu_size(self):
return self._blob.nu_IC_size
@IC_nu_size.setter
def IC_nu_size(self, val):
self.set_IC_nu_size(val)
[docs]
def get_IC_nu_size(self):
return self._blob.nu_IC_size
[docs]
def set_IC_nu_size(self, val):
if val > self._nu_static_size:
raise RuntimeError('value can not exceed',self._nu_static_size)
self._blob.nu_IC_size = val
@property
def nu_seed_size(self):
return self._blob.nu_seed_size
[docs]
def get_seed_nu_size(self):
return self._blob.nu_seed_size
@nu_seed_size.setter
def nu_seed_size(self,val):
self.set_seed_nu_size(val)
[docs]
def set_seed_nu_size(self,val):
if val>self._nu_static_size:
raise RuntimeError('value can not exceed',self._nu_static_size)
self._blob.nu_seed_size=val
[docs]
def set_gamma_grid_size(self,val):
self.emitters_distribution.set_grid_size(gamma_grid_size=val)
@property
def gamma_grid_size(self):
return self._blob.gamma_grid_size
@gamma_grid_size.setter
def gamma_grid_size(self,val):
self.emitters_distribution.set_grid_size(gamma_grid_size=val)
@property
def nu_min(self):
return self._get_nu_min_grid()
@nu_min.setter
def nu_min(self, val):
if hasattr(self, '_blob'):
self._set_nu_min_grid(val)
def _get_nu_min_grid(self):
return self._blob.nu_start_grid
def _set_nu_min_grid(self, val):
self._blob.nu_start_grid=val
@property
def Norm_distr(self):
if hasattr(self,'emitters_distribution'):
if self.emitters_distribution._user_defined is False:
return self._blob.Norm_distr
else:
return self.emitters_distribution.normalize
else:
return None
@Norm_distr.setter
def Norm_distr(self, val):
"""_summary_
Parameters
----------
val : 1/0 or False/True
Raises
------
RuntimeError
_description_
"""
if hasattr(self, 'emitters_distribution'):
if val == 1 or val is True:
if self.emitters_distribution._user_defined is False:
self._blob.Norm_distr = 1
else:
self.emitters_distribution.normalize = val
self.parameters.N.par_type='emitters_density'
self.parameters.N.units ='1/cm3'
elif val == 0 or val is False:
if self.emitters_distribution._user_defined is False:
self._blob.Norm_distr = 0
else:
self.emitters_distribution.normalize = val
self.parameters.N.par_type = 'scaling_factor'
else:
raise RuntimeError('value', val, 'not allowed, allowed 0/1 or False/True')
[docs]
def switch_Norm_distr_ON(self):
self.Norm_distr=True
[docs]
def switch_Norm_distr_OFF(self):
self.Norm_distr = False
@property
def nu_max(self):
return self._get_nu_max_grid()
@nu_max.setter
def nu_max(self, val):
if hasattr(self, '_blob'):
self._set_nu_max_grid(val)
def _set_nu_max_grid(self, val):
self._blob.nu_stop_grid=val
def _get_nu_max_grid(self):
return self._blob.nu_stop_grid
@property
def nu_size(self):
return self._nu_size
@nu_size.setter
def nu_size(self, size):
self._nu_size = size
[docs]
def set_nu_grid_size(self, val):
self._set_nu_grid_size_blob(val)
@property
def nu_grid_size(self):
return self._get_nu_grid_size_blob()
@nu_grid_size.setter
def nu_grid_size(self, val):
self._set_nu_grid_size_blob(val)
def _set_nu_grid_size_blob(self, val):
if val < 100:
val = 100
if val > self._static_spec_arr_grid_size:
raise RuntimeError('value can not exceed', self._nu_static_size)
self._blob.nu_grid_size=val
def _get_nu_grid_size_blob(self):
return self._blob.nu_grid_size
[docs]
def set_verbosity(self,val):
self._blob.verbose=val
[docs]
def get_verbosity(self):
return self._blob.verbose
[docs]
def debug_synch(self):
print ("nu stop synch", self._blob.nu_stop_Sync)
print ("nu stop synch ssc", self._blob.nu_stop_Sync_ssc)
print ("ID MAX SYNCH", self._blob.NU_INT_STOP_Sync_SSC)
[docs]
def debug_SSC(self):
print ("nu start SSC", self._blob.nu_start_SSC)
print ("nu stop SSC", self._blob.nu_stop_SSC)
print ("ID MAX SSC", self._blob.NU_INT_STOP_COMPTON_SSC)
[docs]
def show_emitters_distribution(self):
print('-'*80)
print('%s distribution:'%self.emitters_distribution.emitters_type)
print(" type: %s " % (self._emitters_distribution_name))
print(" gamma energy grid size: ", self.gamma_grid_size)
print(" gmin grid : %e" % self._blob.
gmin_griglia)
print(" gmax grid : %e" % self._blob.gmax_griglia)
print(" normalization ", self.Norm_distr)
print(" log-values ", self._emitters_distribution_log_values)
print('')
self.parameters.par_array.sort(key=lambda x: x.name, reverse=False)
self.parameters.show_pars(names_list=self._emitters_distribution_dic.keys())
[docs]
def show_model(self):
"""
shortcut to :class:`ModelParametersArray.show_pars` method
shows all the paramters in the model
"""
print("")
print('-'*80)
print("model description: ")
print('-'*80)
print("type:",self.__class__.__name__)
print("name: %s " % (self.name))
print("geometry: %s " % (self.geometry))
print('')
print('%s distribution:'%self.emitters_distribution.emitters_type)
print(" type: %s " % (self._emitters_distribution_name))
print (" gamma energy grid size: ",self.gamma_grid_size)
print (" gmin grid : %e"%self._blob.gmin_griglia)
print (" gmax grid : %e"%self._blob.gmax_griglia)
print(" normalization: ", self.Norm_distr)
print(" log-values: ", self._emitters_distribution_log_values)
_p = self.parameters.get_par_by_name('NH_cold_to_rel_e')
if _p is not None:
print(' ratio of cold protons to relativistic electrons: %e'%_p.val)
print('')
if 'Disk' in self.EC_components_list:
print('accretion disk:')
BlazarSED.set_Disk(self._blob)
print(' disk Type: %s'%self.parameters.get_par_by_name('disk_type').val)
print(' L disk: %e (erg/s)'%self.parameters.get_par_by_name('L_Disk').val)
print(' T disk: %e (K)'%self._blob.T_Disk)
print(' nu peak disk: %e (Hz)'%BlazarSED.eval_nu_peak_Disk(self._blob.T_Disk))
if self.parameters.get_par_by_name('disk_type').val == 'MultiBB':
print(' Sw radius %e (cm)'%self._blob.R_Sw)
print(' L Edd. %e (erg/s)'%self._blob.L_Edd)
yr=86400*365
print(' accr_rate: %e (M_sun/yr)'%(yr*self._blob.accr_rate/BlazarSED.m_sun))
print(' accr_rate Edd.: %e (M_sun/yr)'%(yr*self._blob.accr_Edd/BlazarSED.m_sun))
if 'EC_Star' in self.EC_components_list:
print('Star:')
print(' Star radius %e (cm)'%self._blob.R_Star, ',%e (R_Sun)'%(self._blob.R_Star/constants.R_sun.to('cm').value))
print('')
print('radiative fields:')
print (" seed photons grid size: ", self.nu_seed_size)
print (" IC emission grid size: ", self.get_IC_nu_size())
print (' source emissivity lower bound : %e' % self._blob.emiss_lim)
print (' spectral components:')
for _s in self._spectral_components_list:
print(" name:%s,"%_s.name, 'state:', _s.state)
print(" name:%s,"%_s.name, 'hidden:', _s.hidden)
print('external fields transformation method:', self.get_external_field_transf())
print ('')
print ('SED info:')
print (' nu grid size jetkernel: %d' % self.nu_grid_size)
print (' nu size: %d' % self.nu_size)
print (' nu mix (Hz): %e' % self._get_nu_min_grid())
print (' nu max (Hz): %e' % self._get_nu_max_grid())
print('')
print('flux plot lower bound : %e' % self.flux_plot_lim)
print('')
print('-'*80)
self.show_pars()
print('-' * 80)
[docs]
def plot_model(self,plot_obj=None,clean=False,label=None,comp=None,sed_data=None,color=None,auto_label=True,line_style='-',frame='obs', density=False):
"""plot the model
Parameters
----------
plot_obj : _type_, optional
_description_, by default None
clean : bool, optional
_description_, by default False
label : _type_, optional
_description_, by default None
comp : _type_, optional
_description_, by default None
sed_data : _type_, optional
_description_, by default None
color : _type_, optional
_description_, by default None
auto_label : bool, optional
_description_, by default True
line_style : str, optional
_description_, by default '-'
frame : str, optional
_description_, by default 'obs'
density : bool, optional
_description_, by default False
Returns
-------
_type_
_description_
"""
plot_obj=self._set_up_plot(plot_obj,sed_data,frame,density)
if clean is True:
plot_obj.clean_model_lines()
if comp is not None:
c = self.get_spectral_component_by_name(comp)
if label is not None:
comp_label = label
elif label is None and auto_label is True:
comp_label = c.name
else:
comp_label=None
if c.state!='off':
plot_obj.add_model_plot(c.SED, line_style=line_style, label=comp_label,flim=self.flux_plot_lim,color=color,auto_label=auto_label, density=density,update=False, frame=frame)
else:
for c in self.spectral_components_list:
comp_label = c.name
if auto_label is not True:
comp_label=label
#print('comp label',comp_label)
if c.state != 'off' and c.name!='Sum':
plot_obj.add_model_plot(c.SED, line_style=line_style, label=comp_label,flim=self.flux_plot_lim,auto_label=auto_label,color=color, density=density,update=False, frame=frame)
c=self.get_spectral_component_by_name('Sum')
if label is not None:
comp_label = label
else:
comp_label='Sum'
plot_obj.add_model_plot(c.SED, line_style='--', label=comp_label, flim=self.flux_plot_lim,color=color, density=density,update=False)
plot_obj.update_plot()
return plot_obj
@safe_run
def set_blob(self):
if hasattr(self, 'T_esc_e_second'):
if self.T_esc_e_second is None:
if self.geometry == 'spherical':
self._blob.T_esc_e_second = self.parameters.R.val / BlazarSED.vluce_cm
else:
self._blob.T_esc_e_second = self.parameters.R_sh.val*self.parameters.h_sh.val / BlazarSED.vluce_cm
else:
self._blob.T_esc_e_second = self.T_esc_e_second
if self.emitters_distribution._user_defined is True:
self.emitters_distribution._fill()
BlazarSED.Init(self._blob, self.get_DL_cm())
if self.emitters_distribution._user_defined is True:
self.emitters_distribution._set_blob()
@safe_run
def set_external_fields(self):
self.set_blob()
BlazarSED.spectra_External_Fields(1,self._blob,1)
[docs]
def lin_func(self, lin_nu, init, phys_output=False, update_emitters=True):
if self.emitters_distribution is None:
raise RuntimeError('emitters distribution not defined')
if init is True:
self.set_blob()
self._update_spectral_components()
BlazarSED.Run_SED(self._blob)
if phys_output==True:
BlazarSED.EnergeticOutput(self._blob)
if self.emitters_distribution._user_defined is False:
if update_emitters is True:
self.emitters_distribution.update()
else:
self.emitters_distribution._fill()
nu_sed_sum, nuFnu_sed_sum = self.spectral_components.Sum.get_SED_points(lin_nu=lin_nu,log_log=False,interp=self._jetkernel_interp)
nuFnu_sed_hidden=0.
for _sc in self._spectral_components_list:
if _sc.hidden is True:
_, _nuFnu_sed= _sc.get_SED_points(lin_nu=lin_nu,log_log=False,interp=self._jetkernel_interp)
nuFnu_sed_hidden += _nuFnu_sed
nuFnu_sed_sum = nuFnu_sed_sum - nuFnu_sed_hidden
return nu_sed_sum, nuFnu_sed_sum
def _eval_model(self, lin_nu, log_nu, init, loglog, phys_output=False, update_emitters=True):
log_model = None
lin_nu, lin_model = self.lin_func(lin_nu, init, phys_output, update_emitters)
if loglog is True:
log_model = np.log10(lin_model)
return lin_model, log_model
def _prepare_nu_model(self, nu, loglog):
if nu is None:
lin_nu = np.logspace(np.log10(self.nu_min), np.log10(self.nu_max), self.nu_size)
log_nu = np.log10(lin_nu)
else:
if np.shape(nu) == ():
nu = np.array([nu])
if loglog is True:
lin_nu = np.power(10., nu)
log_nu = nu
else:
log_nu = np.log10(nu)
lin_nu = nu
return lin_nu, log_nu
@safe_run
def eval(self,
init=True,
fill_SED=True,
nu=None,
get_model=False,
loglog=False,
plot=None,
label=None,
phys_output=False,
update_emitters=True):
out_model = None
lin_nu, log_nu = self._prepare_nu_model(nu, loglog)
lin_model, log_model= self._eval_model(lin_nu, log_nu ,init, loglog, phys_output=phys_output,
update_emitters=update_emitters)
if fill_SED is True:
self._fill(lin_nu,lin_model)
self.spectral_components.Sum.SED.nuFnu=lin_model
if get_model is True:
if loglog is True:
out_model = log_model
else:
out_model = lin_model
return out_model
[docs]
def energetic_report(self,verbose=True,):
self._build_energetic_report()
if verbose is True:
_show_table(self.energetic_report_table)
def _build_energetic_dict(self):
self.energetic_dict={}
BlazarSED.SetBeaming(self._blob)
#NOTE: workaround for the users be sure they evaluate correctly the jet energetics for 'disk' frame
_change_frame=False
if self.get_external_field_transf()=='disk':
self.set_external_field_transf('blob')
self.eval()
_change_frame=True
self._energetic = BlazarSED.EnergeticOutput(self._blob)
if _change_frame is True:
self.set_external_field_transf('disk')
self.eval()
_par_array=ModelParameterArray()
_name = [i for i in self._energetic.__class__.__dict__.keys() if i[:1] != '_']
_par_array.add_par(ModelParameter(name='BulkLorentzFactor', val=self._blob.BulkFactor, units='',par_type='jet-bulk-factor'))
self.energetic_dict['BulkLorentzFactor']= self._blob.BulkFactor
try:
for _n in _name:
units = 'skip_this'
if _n[0]=='L':
par_type='Lum. blob rest. frame.'
units='erg/s'
elif _n[0] == 'U' and 'DRF' not in _n:
par_type = 'Energy dens. blob rest. frame'
units = 'erg/cm^3'
elif _n[0] == 'U' and 'DRF' in _n:
par_type = 'Energy dens. disk rest. frame'
units = 'erg/cm^3'
elif _n[0] == 'j':
par_type = 'jet Lum.'
units = 'erg/s'
else:
if _n !='thisown':
warnings.warn('energetic name %s not understood'%_n)
if units == 'skip_this':
pass
else:
self.energetic_dict[_n]=getattr(self._energetic, _n)
_par_array.add_par(ModelParameter(name=_n, val=getattr(self._energetic, _n), units=units,par_type=par_type))
except Exception as e:
print('_energetic',self._energetic)
raise RuntimeError('energetic_report failed',e)
return _par_array
def _set_energetic_report(self,_par_array):
if self.emitters_distribution.emitters_type=='electrons':
_par_array.add_par(ModelParameter(name='NH_cold_to_rel_e', val=self.parameters.NH_cold_to_rel_e.val, units='',par_type='cold_p_to_rel_e_ratio'))
self.energetic_dict['NH_cold_to_rel_e'] = self.parameters.NH_cold_to_rel_e.val
self.energetic_report_table = _par_array.par_table
self.energetic_report_table.remove_columns(['log','frozen','phys. bound. min','phys. bound. max'])
self.energetic_report_table.rename_column('par type','type')
if self.emitters_distribution.emitters_type=='electrons':
_i=self.energetic_report_table['name']=='U_p_target'
_i=np.argwhere(_i==True).flatten()[0]
self.energetic_report_table.remove_row(_i)
_=self.energetic_dict.pop('U_p_target')
_i=self.energetic_report_table['name']=='U_p'
_i=np.argwhere(_i==True).flatten()[0]
self.energetic_report_table.remove_row(_i)
_=self.energetic_dict.pop('U_p')
_i=self.energetic_report_table['name']=='L_pp_gamma_rf'
_i=np.argwhere(_i==True).flatten()[0]
self.energetic_report_table.remove_row(_i)
_=self.energetic_dict.pop('L_pp_gamma_rf')
_i=self.energetic_report_table['name']=='jet_L_p'
_i=np.argwhere(_i==True).flatten()[0]
self.energetic_report_table.remove_row(_i)
_=self.energetic_dict.pop('jet_L_p')
if self.emitters_distribution.emitters_type=='protons':
_i=self.energetic_report_table['name']=='U_p_cold'
_i=np.argwhere(_i==True).flatten()[0]
self.energetic_report_table.remove_row(_i)
_=self.energetic_dict.pop('U_p_cold')
_i=self.energetic_report_table['name']=='jet_L_p_cold'
_i=np.argwhere(_i==True).flatten()[0]
self.energetic_report_table.remove_row(_i)
_=self.energetic_dict.pop('jet_L_p_cold')
if isinstance(self,GalacticUnbeamed):
i=[]
for n in self.energetic_report_table['name']:
if 'jet' in n:
i.append(True)
_=self.energetic_dict.pop(n)
else:
i.append(False)
i=np.array(i)
i=np.argwhere(i==True)
self.energetic_report_table.remove_rows(i)
_d_l=['U_Disk','U_BLR','U_DT','U_CMB','U_Synch_DRF','U_Star']
i=[]
for n in self.energetic_report_table['name']:
if n in _d_l:
i.append(True)
_=self.energetic_dict.pop(n)
else:
i.append(False)
i=np.array(i)
i=np.argwhere(i==True)
self.energetic_report_table.remove_rows(i)
for n in self.energetic_report_table['name']:
if n.endswith('_DRF'):
_i=self.energetic_report_table['name']==n
_i=np.argwhere(_i==True).flatten()[0]
nn=self.energetic_report_table[_i]['name'].replace('_DRF','')
self.energetic_report_table[_i]['name']=nn
self.energetic_dict[nn]=self.energetic_dict.pop(n)
if n.endswith('_rf'):
_i=self.energetic_report_table['name']==n
_i=np.argwhere(_i==True).flatten()[0]
nn=self.energetic_report_table[_i]['name'].replace('_rf','')
self.energetic_report_table[_i]['name']=nn
self.energetic_dict[nn]=self.energetic_dict.pop(n)
for r in self.energetic_report_table:
if 'blob' in r['type']:
nn=r['type'].replace('blob','')
r['type']=nn
if 'disk' in r['type']:
nn=r['type'].replace('disk','')
r['type']=nn
def _build_energetic_report(self,):
try:
_par_array=self._build_energetic_dict()
self._set_energetic_report(_par_array)
except Exception as e:
raise RuntimeError('energetic_report failed',e)
[docs]
def get_SED_peak(self,peak_name=None,freq_range=None,log_log=False):
if peak_name is not None and freq_range is not None:
print ("either you provide peak_name or freq_range")
raise ValueError
elif peak_name is not None:
try:
if log_log==False:
return getattr(self._blob,peak_name)
else:
return np.log10(getattr(self._blob,peak_name) )
except:
print ("peak name %s, not found, check name"%peak_name)
raise ValueError
else:
x,y=self.spectral_components.Sum.get_SED_points(log_log=log_log)
msk1=x>freq_range[0]
msk2=x<freq_range[1]
y_m= y[msk1*msk2].max()
x_id= np.argmax(y[msk1*msk2])
return x[msk1*msk2][x_id],y_m
[docs]
def get_component_peak(self,comp_name=None,log_log=False):
comp = self.get_spectral_component_by_name(comp_name)
ID = np.argmax(comp.SED.nuFnu.value)
x_p, y_p = comp.SED.nu[ID].value, comp.SED.nuFnu[ID].value
if log_log is True:
x_p, y_p=np.log10([x_p,y_p])
return x_p, y_p
[docs]
def set_num_c_threads(self,N):
print("===> setting C threads to",N)
if isinstance(N,int):
self._blob.N_THREADS=N
else:
raise RuntimeError('N must be integer')
[docs]
class Jet(JetBase):
""" Jet class
"""
def __init__(self,
cosmo=None,
name=None,
emitters_type='electrons',
emitters_distribution='plc',
emitters_distribution_log_values=False,
beaming_expr='delta',
T_esc_e_second=None,
jet_workplace=None,
verbose=None,
clean_work_dir=True,
electron_distribution=None,
proton_distribution=None,
electron_distribution_log_values=None,
proton_distribution_log_values=None,
geometry='spherical'):
"""
Parameters
----------
cosmo
name
emitters_type
emitters_distribution
emitters_distribution_log_values
beaming_expr
T_esc_e_second
jet_workplace
verbose
clean_work_dir
electron_distribution
proton_distribution
electron_distribution_log_values
proton_distribution_log_values
"""
if electron_distribution is not None:
emitters_type = 'electrons'
emitters_distribution= electron_distribution
if electron_distribution_log_values is not None:
emitters_distribution_log_values = electron_distribution_log_values
if proton_distribution is not None:
emitters_type = 'protons'
emitters_distribution= proton_distribution
if proton_distribution_log_values is not None:
emitters_distribution_log_values = proton_distribution_log_values
if electron_distribution is not None and emitters_type == 'protons':
raise RuntimeError("if you provide an electron_distribution, you can't set emitters_type=",emitters_type, 'use emitters_distribution instead')
if proton_distribution is not None and emitters_type == 'electrons':
raise RuntimeError("if you provide a proton_distribution, you can't set emitters_type=",emitters_type, 'use emitters_distribution instead')
if name is None:
_name = 'jet'
else:
_name = name = clean_var_name(name)
super(Jet,self).__init__(cosmo=cosmo,
name=_name,
emitters_type=emitters_type,
emitters_distribution=emitters_distribution,
emitters_distribution_log_values=emitters_distribution_log_values,
beaming_expr=beaming_expr,
jet_workplace=jet_workplace,
verbose=verbose,
clean_work_dir=clean_work_dir,
geometry=geometry)
if name is None or name == '':
if self.emitters_distribution.emitters_type == 'electrons':
name = 'jet_leptonic'
elif self.emitters_distribution.emitters_type == 'protons':
name = 'jet_hadronic_pp'
else:
name = 'jet'
self.name = clean_var_name(name)
if self.emitters_distribution.emitters_type == 'protons':
self.T_esc_e_second=T_esc_e_second
self.add_pp_gamma_component()
self.add_pp_neutrino_component()
self.add_bremss_ep_component()
if self.emitters_distribution.emitters_type == 'electrons':
self.electron_distribution=self.emitters_distribution
self.parameters.NH_cold_to_rel_e.hidden=False
if self.emitters_distribution.emitters_type == 'protons':
self.protons_distribution=self.emitters_distribution
[docs]
@staticmethod
def available_electron_distributions():
JetBase.available_emitters_distributions()
[docs]
@staticmethod
def available_proton_distributions():
JetBase.available_emitters_distributions()
[docs]
def get_proton_distribution_name(self):
return self.get_emitters_distribution_name()
[docs]
def get_electron_distribution_name(self):
return self.get_emitters_distribution_name()
[docs]
def show_proton_distribution(self):
self.show_emitters_distribution()
[docs]
def show_electron_distribution(self):
self.show_emitters_distribution()
[docs]
def add_bremss_ep_component(self):
self._add_spectral_component('Bremss_ep', var_name='do_bremss_ep', state_dict=dict((('on', 1), ('off', 0))))
[docs]
def add_pp_gamma_component(self):
self._add_spectral_component('PP_gamma', var_name='do_pp_gamma', state_dict=dict((('on', 1), ('off', 0))))
[docs]
def add_pp_neutrino_component(self):
self._add_spectral_component('PP_neutrino_tot', var_name='do_pp_neutrino',
state_dict=dict((('on', 1), ('off', 0))))
self._add_spectral_component('PP_neutrino_mu', var_name='do_pp_neutrino',
state_dict=dict((('on', 1), ('off', 0))))
self._add_spectral_component('PP_neutrino_e', var_name='do_pp_neutrino',
state_dict=dict((('on', 1), ('off', 0))))
[docs]
def set_N_from_U_emitters(self,U, gmin=None, gmax=None):
""" Sets the normalization of N to match the energy density of the primary emitters
Parameters
----------
U: float, (erg/cm3)
gmin: float, optional,
minimum value to evaluate the integral
gmax: float, optional,
maximum value to evaluate the integral
Returns
-------
"""
N = self.parameters.N.val
ratio = U/self.emitters_distribution.eval_U(gmin=gmin, gmax=gmax)
self.emitters_distribution._fill()
self.set_par('N', val=N *ratio)
self.set_blob()
[docs]
def set_N_from_U_vol_emitters(self, U_vol, gmin=None, gmax=None):
"""Sets the normalization of N to match the volume integrated energy of the primary emitters
Parameters
----------
U_vol: float (erg/s)
gmin: float, optional,
minimum value to evaluate the integral
gmax: float, optional,
maximum value to evaluate the integral
Returns
-------
"""
U=U_vol/ self._blob.Vol_sphere
self.set_N_from_U_emitters(U, gmin=gmin, gmax=gmax)
[docs]
def set_N_from_L_sync(self,L_sync):
"""Sets the normalization of N to match the src integrated Luminosity of the synchrotron emission
Parameters
----------
L_sync : float (erg/s)
Returns
-------
"""
self.set_par('N', val=1.0)
#gamma_grid_size = self._blob.gamma_grid_size
#self.emitters_distribution.set_grid_size(100)
self.set_blob()
delta = self.get_beaming()
ratio = L_sync/(BlazarSED.Power_Sync_Electron(self._blob)* delta ** 4)
self.set_par('N', val=ratio)
[docs]
def set_N_from_F_sync(self, F_sync):
"""Sets the normalization of N to match the observed integrated synchrotron flux
Parameters
----------
F_sync : float, (erg cm-1 s-1 Hz-1)
observed integrated synchrotron flux
Returns
-------
"""
DL = self.get_DL_cm()
L = F_sync * DL * DL * 4.0 * np.pi
self.set_N_from_L_sync(L)
[docs]
def set_N_from_nuLnu(self,nuLnu_src, nu_src):
"""Sets the normalization of N to match the src Luminosity of the synchrotron emission at src frequency nu
Parameters
----------
nuLnu_src : float, (erg/s)
Luminosity of the synchrotron emission at src frequency nu
nu_src: float (Hz)
synchrotron emission at src frequency nu (Hz)
Returns
-------
"""
self.set_par('N',val=1.0)
#gamma_grid_size = self._blob.gamma_grid_size
#self.emitters_distribution.set_grid_size(100)
self.set_blob()
delta = self._blob.beam_obj
nu_blob = nu_src / delta
L_out = BlazarSED.Lum_Sync_at_nu(self._blob, nu_blob) * delta ** 4
N_out = nuLnu_src / L_out
#self.emitters_distribution.set_grid_size(gamma_grid_size)
self.set_par('N', val=N_out)
[docs]
def set_N_from_nuFnu(self, nuFnu_obs, nu_obs):
"""Sets the normalization of N to match the observed flux nuFnu_obs at a given frequency nu_obs
Parameters
----------
nuFnu_obs: float, (erg cm-1 s-1 Hz-1)
observed differential synchrotron flux
nu_obs: float, (Hz)
synchrotron emission at src frequency nu (Hz)
Returns
-------
"""
self.set_blob()
DL = self.get_DL_cm()
L = nuFnu_obs * DL * DL * 4.0 * np.pi
nu_rest = nu_obs * (1 + self.parameters.z_cosm.val)
self.set_N_from_nuLnu( L, nu_rest)
[docs]
def set_B_eq(self, nuFnu_obs, nu_obs, B_min=1E-9,B_max=1.0,N_pts=20,plot=False):
"""Sets the magnetic field (B) equipartition from numerical minimization over a logarithmic grid of B values,
for a given observed flux of the synchrotron emission (nuFnu_obs) at a given observed frequency (nu_obs)
Parameters
----------
nuFnu_obs: float, (erg cm-1 s-1 Hz-1)
observed differential synchrotron flux
nu_obs: float, (Hz)
synchrotron emission at src frequency nu (Hz)
B_min: float, (Gauss), optional
lower bound for B-grid
B_max: float, (Gauss), optional
upper bound for B-grid
N_pts: int, optional
Other number of points to build the B-grid
plot: book, optional, default=False
if True plots the numerical grid
Returns
-------
"""
b_grid = np.logspace(np.log10(B_min), np.log10(B_max), N_pts)
print ('B grid min ',B_min)
print ('B grid max ',B_max)
print ('grid points',N_pts)
U_e = np.zeros(N_pts)
U_B = np.zeros(N_pts)
N = np.zeros(N_pts)
self.set_par('B', b_grid[0])
self.set_blob()
for ID, b in enumerate(b_grid):
self.set_par('B', b)
self.set_par('N', 1.0)
# print 'B_eq',ID
self.set_N_from_nuFnu(nuFnu_obs, nu_obs)
N[ID]=self.get_par_by_name('N').val
self.set_blob()
#
U_e[ID] = self._blob.U_e
U_B[ID] = self._blob.UB
# delta=Jet.get_beaming()
# print "check L_in=%4.4e L_out=%4.4e"%(L_0,(L_0/delta**4)/BlazarSED.Power_Sync_Electron(Jet._Jet__blob))
ID_min = np.argmin(U_B + U_e)
if plot==True:
#import pylab as plt
fig, ax = plt.subplots()
ax.plot(b_grid,U_e,label=r'$u_e$')
ax.plot(b_grid,U_B,label=r'$u_B$')
ax.plot(b_grid,U_B+U_e)
ax.scatter(b_grid[ID_min],U_e[ID_min]+U_B[ID_min])
ax.semilogy()
ax.semilogx()
ax.set_xlabel(r'$B$ (G)')
ax.set_ylabel(r'$u$ erg cm$-^3$')
ax.legend
plt.show()
self.set_par('B', val=b_grid[ID_min])
self.set_par('N', val=N[ID_min])
print ('setting B to ',b_grid[ID_min])
print ('setting N to ',N[ID_min])
return b_grid[ID_min],b_grid,U_B,U_e
def eval_synch_pol(self,nu_range):
"""_summary_
evluates the synchrotron polarization for
Parameters
----------
nu_range : numpy array
array of frequencies for the polarization evaluation
Returns
-------
arrrays of polarization and nuF_nu
"""
nuF_nu=self.eval(get_model=True,nu=nu_range)
pol_nu=np.zeros(nu_range.size)
for ID,nu in enumerate(nu_range):
pol_nu[ID]=BlazarSED.eval_Sync_polarization(self._blob,nu)
m=np.logical_or(nuF_nu<=0,np.isnan(pol_nu))
pol_nu[m]=0
nuF_nu[m]=0
return pol_nu,nuF_nu
[docs]
class GalacticBeamed(Jet):
def __init__(self,
distance=3*u.kpc,
name=None,
emitters_type='electrons',
emitters_distribution='plc',
emitters_distribution_log_values=False,
beaming_expr='delta',
T_esc_e_second=None,
jet_workplace=None,
verbose=None,
clean_work_dir=True,
electron_distribution=None,
proton_distribution=None,
electron_distribution_log_values=None,
proton_distribution_log_values=None,
geometry='spherical'):
if name is None:
_name = 'unbeamed'
else:
_name = name = clean_var_name(name)
if np.isscalar(distance):
_d=distance
else:
try:
_d=distance.to('cm')
except:
raise RuntimeError('distance must be either an astropy unit object convertible to cm, or a scalar in cm')
cosmo=Cosmo(DL_cm=_d,verbose=False)
super(GalacticBeamed,self).__init__(cosmo=cosmo,
T_esc_e_second=T_esc_e_second,
name=_name,
emitters_type=emitters_type,
emitters_distribution=emitters_distribution,
emitters_distribution_log_values=emitters_distribution_log_values,
electron_distribution=electron_distribution,
proton_distribution=proton_distribution,
electron_distribution_log_values=electron_distribution_log_values,
proton_distribution_log_values=proton_distribution_log_values,
beaming_expr=beaming_expr,
jet_workplace=jet_workplace,
verbose=verbose,
clean_work_dir=clean_work_dir,
geometry=geometry)
if name is None or name == '':
if self.emitters_distribution.emitters_type == 'electrons':
name = 'galactic_beamed_leptonic'
elif self.emitters_distribution.emitters_type == 'protons':
name = 'galactic_beamed_hadronic_pp'
else:
name = 'galactic_beamed'
self.name = clean_var_name(name)
self._handle_z(d=_d)
def _dummy_z_par_func(self,DL_cm):
self.cosmo._DL_cm=DL_cm*u.cm
return 0
def _handle_z(self,d=u.kpc*1):
self._blob.z_cosm=0
self.parameters.z_cosm.val=0
self.parameters.z_cosm.hidden=True
self.parameters.z_cosm.frozen=True
_dmax=1000*u.kpc.to('cm')
self.add_user_par(name='DL_cm',units='cm',val=d.value,val_min=0,val_max=_dmax)
p=self.parameters.get_par_by_name('DL_cm')
p.par_type='distance'
[docs]
class GalacticUnbeamed(GalacticBeamed):
def __init__(self,
distance=3*u.kpc,
name=None,
emitters_type='electrons',
emitters_distribution='plc',
emitters_distribution_log_values=False,
T_esc_e_second=None,
jet_workplace=None,
verbose=None,
clean_work_dir=True,
electron_distribution=None,
proton_distribution=None,
electron_distribution_log_values=None,
proton_distribution_log_values=None,
geometry='spherical'):
super(GalacticUnbeamed,self).__init__(distance=distance,
name=name,
emitters_type=emitters_type,
emitters_distribution=emitters_distribution,
emitters_distribution_log_values=emitters_distribution_log_values,
beaming_expr='delta',
T_esc_e_second=T_esc_e_second,
jet_workplace=jet_workplace,
verbose=verbose,
clean_work_dir=clean_work_dir,
electron_distribution=electron_distribution,
proton_distribution=proton_distribution,
electron_distribution_log_values=electron_distribution_log_values,
proton_distribution_log_values=proton_distribution_log_values,
geometry=geometry)
if name is None or name == '':
if self.emitters_distribution.emitters_type == 'electrons':
name = 'galactic_unbeamed_leptonic'
elif self.emitters_distribution.emitters_type == 'protons':
name = 'galactic_unbeamed_hadronic_pp'
else:
name = 'galactic_unbeamed'
self.name=name
self.parameters.beam_obj.val=1
self.parameters.beam_obj.hidden=True
self.parameters.R_H.hidden=True
self.set_external_field_transf('disk')