Source code for jetset.jet_model

"""Jet model classes and utilities for spectral energy distribution calculations."""

__author__ = "Andrea Tramacere"


#import json
#import dill as pickle
import six
import numpy as np
import copy
import warnings
import os
from astropy import units as u
from astropy import constants
#from contextlib import redirect_stdout
#import io
import multiprocessing

from .jet_spectral_components import JetSpecComponent, SpecCompList

from .model_parameters import ModelParameterArray, ModelParameter, _show_table
from .base_model import Model
from .output import makedir,WorkPlace
from  .plot_sedfit import plt
from .cosmo_tools import Cosmo
from .utils import set_str_attr, old_model_warning, get_info, clean_var_name, get_nested_attr
from .jet_paramters import *
from .jet_emitters import *
from .jet_emitters_factory import EmittersFactory, InjEmittersFactory
from .jet_kernel_tools import set_emitters_c_array1d_fast as set_emitters
from .jet_kernel_tools import get_emitters_c_array1d_fast as get_emitters
from .jet_tools import *
from .mathkernel_helper import bessel_table_file_path
from .internal_absorption import InternalAbsorption

on_rtd = os.environ.get('READTHEDOCS', None) == 'True'

if on_rtd is True:
    from .mock import jetkernel as BlazarSED
else:
    from .jetkernel import jetkernel as BlazarSED



__all__=['Jet','JetBase','GalacticBeamed','GalacticUnbeamed']


[docs] class JetBase(Model): """Base jet model wrapping the jetkernel backend. The class exposes a Python interface to configure physical parameters, emitter distributions, spectral components, and evaluation options. """ def __repr__(self): return str(self.show_model()) def __init__(self, cosmo=None, name='test', emitters_type='electrons', emitters_distribution='pl', emitters_distribution_log_values=False, beaming_expr='delta', jet_workplace=None, verbose=False, nu_size=500, clean_work_dir=True, geometry='spherical', **keywords): """Build a jet model and initialize the backend state. Parameters ---------- cosmo : Cosmo, optional Cosmology helper. If omitted, a default :class:`Cosmo` instance is used. name : str, optional Model name. emitters_type : str, optional Primary emitter type, typically ``'electrons'`` or ``'protons'``. emitters_distribution : str or BaseEmittersDistribution, optional Emitter distribution identifier or distribution object. emitters_distribution_log_values : bool, optional If ``True``, distribution parameters are interpreted in log space. beaming_expr : str, optional Beaming parametrization (for example ``'delta'`` or ``'bulk_theta'``). jet_workplace : WorkPlace, optional Output workspace used to store model products. verbose : bool, optional Verbosity flag forwarded to the backend. nu_size : int, optional Number of frequency points used by model evaluation. clean_work_dir : bool, optional If ``True``, clean existing output directory before use. geometry : str, optional Emission-region geometry. Allowed values are ``'spherical'`` and ``'spherical_shell'``. """ super(JetBase,self).__init__( cosmo=cosmo, **keywords) self.name = clean_var_name(name) self.model_type='jet' #self._emitters_type=emitters_type self._scale='lin-lin' self.verbose=verbose self._blob = self._build_blob(verbose=verbose) self._static_spec_arr_grid_size = BlazarSED.static_spec_arr_grid_size self._nu_static_size = BlazarSED.static_spec_arr_size self.nu_size = nu_size self.nu_grid_size=self._get_nu_grid_size_blob() N=multiprocessing.cpu_count() if N>2: N=min(N,20) self.set_num_c_threads(N=N) if jet_workplace is None: jet_workplace=WorkPlace() out_dir= jet_workplace.out_dir + '/' + self.name + '_jet_prod/' else: out_dir=jet_workplace.out_dir+'/'+self.name+'_jet_prod/' self._allowed_geometry=['spherical','spherical_shell'] self.geometry=geometry self.set_path(out_dir,clean_work_dir=clean_work_dir) self.set_flag(self.name) self._allowed_EC_components_list=['EC_BLR', 'DT', 'EC_DT', 'Star', 'EC_Star', #CMB', 'EC_CMB', 'Disk', 'Disk_MultiBB', 'Disk_Mono', 'EC_Disk', 'All'] self._allwed_disk_type =['BB', 'Mono', 'MultiBB'] self.EC_components_list =[] self._spectral_components_list=[] self._hidden_spectral_components_list = [] self._internal_absorption_comp={} self.spectral_components= SpecCompList(self._spectral_components_list) self.add_basic_components() self.SED=self.get_spectral_component_by_name('Sum').SED self.parameters = JetModelParameterArray(model=self) self._emitting_region_dict = None self._electron_distribution_dic= None self._external_photon_fields_dic= None self._original_emitters_distr = None self._original_inj_emitters_distr = None self.inj_emitters_distribution = None self._leptonic_equilibrium = False self._energetic = None self.skip_internal_absorption_serial=False self._setup(emitters_distribution,emitters_distribution_log_values,beaming_expr,emitters_type) def _setup(self, emitters_distribution, emitters_distribution_log_values, beaming_expr, emitters_type): """Initialize internal model state and bind the selected emitter distribution. Parameters ---------- emitters_distribution : str or BaseEmittersDistribution Emitter distribution definition used to configure the model. emitters_distribution_log_values : bool If ``True``, interpret distribution parameters provided in logarithmic space. beaming_expr : str Beaming parameterization used by the emitting region (for example ``'delta'``). emitters_type : str Primary particle type handled by the model (for example ``'electrons'``). """ self.EC_components_list = [] self._spectral_components_list = [] self.spectral_components = SpecCompList(self._spectral_components_list) self.add_basic_components() self.SED = self.get_spectral_component_by_name('Sum').SED self.parameters = JetModelParameterArray(model=self) self._emitting_region_dict = None self._emitters_distribution_dic = None self._external_photon_fields_dic = None self._blob.core.IC_adaptive_e_binning = 0 self._blob.core.do_IC_down_scattering = 0 if hasattr(emitters_distribution,'emitters_type'): emitters_type=emitters_distribution.emitters_type self.set_emitting_region(beaming_expr,emitters_type) self.flux_plot_lim=1E-30 self.set_emiss_lim(1E-120) self._IC_states = {} self._IC_states['on'] = 1 self._IC_states['off'] = 0 self._external_field_transf = {} self._external_field_transf['blob'] = 0 self._external_field_transf['disk'] = 1 self._jetkernel_interp='linear' self.set_emitters_distribution(emitters_distribution, emitters_distribution_log_values, emitters_type, init=False) self.set_blob() def __getstate__(self): j= self._serialize_model() return j def __setstate__(self,state): self.__init__() self._decode_model(state) self._fix_par_dep_on_load(verbose=False) if '_internal_absorption_comp' in state: self._internal_absorption_comp=state['_internal_absorption_comp'] # for c in state['_internal_absorption_comp'].keys(): # p=state['_internal_absorption_comp'][c]['pars'] # self.enable_internal_absorption(**p) def _serialize_model(self): _model = {} _model['version']=get_info()['version'] _model['name'] = self.name _model['emitters_type'] = self.emitters_distribution.emitters_type if self.inj_emitters_distribution is None: if isinstance(self.emitters_distribution,EmittersDistribution): self._original_emitters_distr._copy_from_jet(self) _model['custom_emitters_distribution']=self._original_emitters_distr clean_numba(_model['custom_emitters_distribution']) _model['emitters_distribution_class'] = 'EmittersDistribution' else: raise RuntimeError('emitters distribution type not valid',type(self._emitters_distribution)) else: if isinstance(self.inj_emitters_distribution ,InjEmittersDistribution): #self._original_emitters_distr._copy_from_jet(self) _model['custom_emitters_distribution']=self.inj_emitters_distribution clean_numba(_model['custom_emitters_distribution']) _model['emitters_distribution_class'] = 'InjEmittersDistribution' else: raise RuntimeError('inj_emitters_distribution distribution type not valid',type(self.inj_emitters_distribution)) if hasattr(self,'geometry'): _model['geometry']=self.geometry _model['beaming_expr'] = self._beaming_expr _model['spectral_components_name'] = self.get_spectral_component_names_list() _model['spectral_components_state'] = [c.state for c in self._spectral_components_list] _model['EC_components_name'] = self.EC_components_list _model['basic_components_name'] = self.basic_components_list _model['cosmo'] = self.cosmo._serialize_model() _model['pars'] = {} _model['pars']=self.parameters._serialize_pars() _model['external_field_transf']=self.get_external_field_transf() #print('self.skip_internal_absorption_serial',self.self.skip_internal_absorption_serial) if self.skip_internal_absorption_serial is False: _model['_internal_absorption_comp']=self._internal_absorption_comp _model['internal_pars'] = {} _model['internal_pars']['nu_size'] = self.nu_size _model['internal_pars']['nu_seed_size'] = self.nu_seed_size _model['internal_pars']['gamma_grid_size'] = self.gamma_grid_size _model['internal_pars']['IC_nu_size']=self.IC_nu_size _model['internal_pars']['nu_min']=self.nu_min _model['internal_pars']['nu_max']=self.nu_max _model['internal_pars']['Norm_distr'] = self.Norm_distr return _model #def save_model(self,file_name,to_string=False): # pickle.dump(self, open(file_name, 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
[docs] @classmethod #@safe_run def load_model(cls, file_name_or_obj, from_string=False): """Load a serialized jet model. Parameters ---------- file_name_or_obj : str or bytes or file-like Serialized model source accepted by the internal pickle loader. from_string : bool, optional If ``True``, interpret ``file_name_or_obj`` as in-memory content instead of a filesystem path. Returns ------- JetBase Reconstructed model with backend state refreshed via ``set_blob``. """ try: jet=cls._load_pickle(file_name_or_obj,from_string=from_string) jet.set_blob() jet._update_spectral_components() if hasattr(jet,'_internal_absorption_comp'): for c in jet._internal_absorption_comp: p=jet._internal_absorption_comp[c]['pars'] jet.enable_internal_absorption(**p) return jet except Exception as e: raise RuntimeError('The model you loaded is not valid please check the file name', e)
def _decode_model(self,_model): if 'version' in _model.keys(): self._set_version(_model['version']) else: self._set_version(v='unknown(<1.1.2)') self.cosmo = Cosmo.from_model(_model['cosmo']) self.model_type = 'jet' self.name = _model['name'] if 'geometry' in _model.keys(): self.geometry=_model['geometry'] else: warnings.warn('the model you are loading has not geometry specification. Assuming it is spherical!') self.geometry='spherical' self.set_blob() self.parameters = JetModelParameterArray(model=self) if 'emitters_type' in _model.keys(): emitters_type=str(_model['emitters_type']) else: emitters_type = 'electrons' if 'electron_distribution' in _model.keys(): _v=_model['electron_distribution'] del(_model['electron_distribution']) _model['emitters_distribution']=_v if 'electron_distribution_log_values' in _model.keys(): _v=_model['electron_distribution_log_values'] del(_model['electron_distribution_log_values']) _model['emitters_distribution_log_values']=_v if _model['emitters_distribution_class'] == 'EmittersDistribution' or _model['emitters_distribution_class'] == 'InjEmittersDistribution': self.set_emitters_distribution(distr=_model['custom_emitters_distribution'], init=False) else: raise RuntimeError('emitters distribution type not valid', type(self._emitters_distribution)) for c in self.basic_components_list: if c not in _model['basic_components_name']: self.del_spectral_component(c) if self.emitters_distribution.emitters_type == 'protons': self.add_pp_gamma_component() self.add_pp_neutrino_component() self.add_bremss_ep_component() if 'disk_type' in _model['pars'].keys(): disk_type=_model['pars']['disk_type']['val'] else: disk_type=None self.add_EC_component(_model['EC_components_name'],disk_type=disk_type) self.set_external_field_transf(_model['external_field_transf']) for ID, c in enumerate(_model['spectral_components_name']): comp = getattr(self.spectral_components, c) if comp._state_dict != {}: comp.state = _model['spectral_components_state'][ID] self.SED = self.get_spectral_component_by_name('Sum').SED self.set_emitting_region(str(_model['beaming_expr']),self.emitters_distribution.emitters_type) if isinstance(self,GalacticBeamed): self._handle_z(d=self.cosmo._DL_cm) _par_dict = _model['pars'] _non_user_dict={} _user_dict={} for k, v in _model['pars'].items(): if v['par_type'] == 'user_defined': _user_dict[k]=v else: _non_user_dict[k]=v self.parameters._decode_pars(_non_user_dict) for k, v in _user_dict.items(): v['name']=k self.parameters.add_par(ModelParameter(**v)) _par_dict = _model['internal_pars'] for k in _par_dict.keys(): setattr(self,k,_par_dict[str(k)]) def _build_blob(self, verbose=False): blob = BlazarSED.MakeBlob() blob.Sync.x_Bessel_min = 1E-17 blob.Sync.x_Bessel_max = 7.2E2 blob.Sync.x_ave_Bessel_min = 1E-16 blob.Sync.x_ave_Bessel_max = 3.5E2 blob.Sync.log_x_Bessel_min = np.log10( blob.Sync.x_Bessel_min) blob.Sync.log_x_Bessel_max = np.log10( blob.Sync.x_Bessel_max) blob.Sync.log_x_ave_Bessel_min = np.log10( blob.Sync.x_ave_Bessel_min) blob.Sync.log_x_ave_Bessel_max = np.log10( blob.Sync.x_ave_Bessel_max) F_Sync_x_ptr = get_nested_attr(blob, 'Sync.F_Sync_x') F_Sync_y_ptr = get_nested_attr(blob, 'Sync.F_Sync_y') G_Sync_x_ptr = get_nested_attr(blob, 'Sync.G_Sync_x') G_Sync_y_ptr = get_nested_attr(blob, 'Sync.G_Sync_y') F_ave_Sync_x_ptr = get_nested_attr(blob, 'Sync.F_ave_Sync_x') F_ave_Sync_y_ptr = get_nested_attr(blob, 'Sync.F_ave_Sync_y') log_F_Sync_x_ptr = get_nested_attr(blob, 'Sync.log_F_Sync_x') log_F_Sync_y_ptr = get_nested_attr(blob, 'Sync.log_F_Sync_y') log_F_ave_Sync_x_ptr = get_nested_attr(blob, 'Sync.log_F_ave_Sync_x') log_F_ave_Sync_y_ptr = get_nested_attr(blob, 'Sync.log_F_ave_Sync_y') log_G_Sync_x_ptr = get_nested_attr(blob, 'Sync.log_G_Sync_x') log_G_Sync_y_ptr = get_nested_attr(blob, 'Sync.log_G_Sync_y') d = np.genfromtxt(bessel_table_file_path,comments='#') log_F_Sync_x=np.log10(d[:,0]) log_F_Sync_y=np.log10(d[:,1]) log_F_ave_Sync_x=np.log10(d[:,2]) log_F_ave_Sync_y=np.log10(d[:,3]) log_G_Sync_x=np.log10(d[:,4]) log_G_Sync_y=np.log10(d[:,5]) for ID, l in enumerate(d): BlazarSED.set_bessel_table(F_Sync_x_ptr,blob, l[0], ID) BlazarSED.set_bessel_table(F_Sync_y_ptr, blob, l[1], ID) BlazarSED.set_bessel_table(F_ave_Sync_x_ptr, blob, l[2], ID) BlazarSED.set_bessel_table(F_ave_Sync_y_ptr, blob, l[3], ID) BlazarSED.set_bessel_table(G_Sync_x_ptr,blob, l[4], ID) BlazarSED.set_bessel_table(G_Sync_y_ptr, blob, l[5], ID) BlazarSED.set_bessel_table(log_F_Sync_x_ptr, blob, log_F_Sync_x[ID], ID) BlazarSED.set_bessel_table(log_F_Sync_y_ptr, blob, log_F_Sync_y[ID], ID) BlazarSED.set_bessel_table(log_F_ave_Sync_x_ptr, blob, log_F_ave_Sync_x[ID], ID) BlazarSED.set_bessel_table(log_F_ave_Sync_y_ptr, blob, log_F_ave_Sync_y[ID], ID) BlazarSED.set_bessel_table(log_G_Sync_x_ptr, blob, log_G_Sync_x[ID], ID) BlazarSED.set_bessel_table(log_G_Sync_y_ptr, blob, log_G_Sync_y[ID], ID) blob.core.BESSEL_TABLE_DONE=1 if verbose is False: blob.core.verbose = 0 else: blob.core.verbose = 1 set_str_attr(blob, 'core.path', './') set_str_attr(blob, 'core.MODE', 'custom') blob.emitters.gamma_grid_size = 200 blob.core.nu_IC_size = 100 blob.core.nu_seed_size = 100 blob.core.nu_grid_size= 1000 blob.core.do_Sync = 2 blob.core.do_SSC = 1 blob.core.R = 5.0e15 blob.core.h_sh=0.1 blob.core.B = 0.1 blob.core.z_cosm = 0.1 blob.core.BulkFactor = 10 blob.core.theta = 0.1 blob.emitters.N = 100 blob.PP_gamma.NH_pp = 1 blob.emitters.NH_cold_to_rel_e = 1.0 blob.Disk.L_Disk = 1E45 blob.DT.L_DT = 1E45 blob.emitters.gmin = 2 blob.emitters.gmax = 1e6 blob.Sync.spec.nu_min = 1e6 blob.Sync.spec.nu_max = 1e20 blob.SSC.spec.nu_min = 1e14 blob.SSC.spec.nu_max = 1e30 blob.core.nu_start_grid = 1e6 blob.core.nu_stop_grid = 1e30 return blob
[docs] def set_emitting_region(self,beaming_expr,emitters_type): """Configure emitting-region parameters on the backend. Parameters ---------- beaming_expr : str Beaming parametrization used to build region parameters. emitters_type : str Emitter family used to select the corresponding parameter set. """ if self._emitting_region_dict is not None: self.del_par_from_dic(self._emitting_region_dict) self._beaming_expr=beaming_expr set_str_attr(self._blob,'core.BEAMING_EXPR',beaming_expr) self._emitting_region_dict=build_emitting_region_dict(self.cosmo,beaming_expr=beaming_expr,emitters_type=emitters_type) self._set_geometry() self.parameters.add_par_from_dict(self._emitting_region_dict,self,'_blob',JetParameter)
def _set_geometry(self): self._blob.core.GEOMETRY=self.geometry if self.geometry == 'spherical': pass elif self.geometry == 'spherical_shell': self._emitting_region_dict.pop('R') self._emitting_region_dict['R_sh'] = JetModelDictionaryPar(ptype='region_size', vmin=1E3, vmax=1E30, punit='cm', froz=False, log=False, jetkernel_par_name='core.R_sh') self._emitting_region_dict['h_sh'] = JetModelDictionaryPar(ptype='scaling_factor', val=0.1, vmin=0, punit='', vmax=1, froz=False, log=False, jetkernel_par_name='core.h_sh') @property def spectral_components_list(self): """Return visible spectral components. Returns ------- list Spectral components with ``hidden == False``. """ return [s for s in self._spectral_components_list if s.hidden is False] @property def geometry(self,): """Return the emitting-region geometry.""" return self._geometry @geometry.setter def geometry(self,geometry): """Set the emitting-region geometry.""" if geometry in self._allowed_geometry: self._geometry=geometry else: raise RuntimeError("geometry %s not allowed" % geometry, "please choose among ", self._allowed_geometry)
[docs] def make_conical_jet(self, R=None,theta_open=5.,theta_open_min=1,theta_open_max=10): """Convert the model to a conical geometry parametrization. The method introduces a user parameter ``theta_open`` and sets ``R = tan(theta_open) * R_H`` as a dependent parameter relation. Parameters ---------- R : float, optional Target region size used to initialize ``R_H`` after linking. If omitted, the current ``R`` value is used. theta_open : float, optional Semi-opening angle in degrees. theta_open_min : int, optional Lower bound for ``theta_open``. theta_open_max : int, optional Upper bound for ``theta_open``. Raises ------ RuntimeError If dependency setup fails (for example because parameters already exist with incompatible dependency state). """ if R is None: R=self.parameters.R.val try: self.add_user_par(name='theta_open',val=theta_open,units='deg',val_min=theta_open_min,val_max=theta_open_max) self.make_dependent_par(par='R', depends_on=['R_H','theta_open'], par_expr='np.tan(np.radians(theta_open))*R_H') self.parameters.R_H.free() R_H=R/(np.tan(np.radians(self.parameters.theta_open.val))) print('setting R_H to',R_H) self.parameters.R_H.val=R_H except RuntimeError as e: msg='\n' msg+='something wrong, probably theta_open has already been added, or R is already dependant\n' msg+='please, try to reset the dependency of R: e.g my_jet.parameters.R.reset_dependencies()\n' msg+='or, try to remove theta_open: e.g my_jet.parameters.del_par(my_jet.parameters.theta_open)\n' msg+='the error is in:\n' msg+=str(e) raise RuntimeError(msg) except Exception as e: raise RuntimeError('unexpected exception',str(e))
[docs] def set_EC_dependencies(self): """Attach standard EC size-luminosity relations for BLR and DT. When disk parameters are available, this sets dependent-parameter expressions for ``R_BLR_in``, ``R_BLR_out``, and ``R_DT`` as functions of ``L_Disk``. """ try: if self.parameters.get_par_by_name('L_Disk') is not None and self.parameters.get_par_by_name('R_BLR_in') is not None: try: self.make_dependent_par(par='R_BLR_in', depends_on=['L_Disk'], par_expr='3E17*(L_Disk/1E46)**0.5') self.make_dependent_par(par='R_BLR_out', depends_on=['R_BLR_in'], par_expr='R_BLR_in*1.1') except RuntimeError as e: print('functional dependencies already invoked for BLR',str(e)) if self.parameters.get_par_by_name('L_Disk') is not None and self.parameters.get_par_by_name('R_DT') is not None: try: self.make_dependent_par(par='R_DT', depends_on=['L_Disk'], par_expr='2E19*(L_Disk/1E46)**0.5') except RuntimeError as e: print('functional dependencies already invoked for DT',str(e)) except Exception as e: print('unexpected exception',str(e))
@property def IC_adaptive_e_binning(self,): """Return whether adaptive electron binning is enabled for IC.""" return np.intc(self._blob.core.IC_adaptive_e_binning) @IC_adaptive_e_binning.setter def IC_adaptive_e_binning(self,state): """Enable or disable adaptive electron binning for IC.""" if type(state) == bool: pass else: raise RuntimeError('state has to be boolean') self._blob.core.IC_adaptive_e_binning=np.intc(state)
[docs] @staticmethod def available_emitters_distributions(): """Print available emitter distributions from the factory.""" EmittersFactory.available_distributions()
@staticmethod def _coerce_inj_emitters_distribution(q_inj): if isinstance(q_inj, InjEmittersDistribution) is False: raise RuntimeError('q_inj has to be an instance of InjEmittersDistribution') if q_inj.emitters_type != 'electrons': raise RuntimeError('q_inj emitters_type has to be electrons') if hasattr(q_inj, 'distr_func') is True and q_inj.distr_func is not None: return q_inj try: q_inj_from_factory = InjEmittersFactory().create_inj_emitters(name=q_inj.spectral_type, gamma_grid_size=q_inj._gamma_grid_size, log_values=q_inj._log_values, emitters_type=q_inj.emitters_type, normalize=q_inj.normalize) for p in q_inj.parameters.par_array: _p = q_inj_from_factory.parameters.get_par_by_name(p.name) if _p is not None: _p.set(val=p.val, skip_dep_par_warning=True) return q_inj_from_factory except Exception as e: raise RuntimeError('q_inj has no distribution function; use InjEmittersFactory().create_inj_emitters(...) or set_distr_func(...)') from e def _disable_leptonic_equilibrium(self, remove_parameters=False): self._leptonic_equilibrium = False self.inj_emitters_distribution = None self._original_inj_emitters_distr = None self._blob.emitters.do_equilibrium = 0 if remove_parameters is True: for par_name in ('T_esc_e_primaries','L_inj'): p = self.parameters.get_par_by_name(par_name) if p is not None: self.parameters.del_par(p) def _ensure_leptonic_equilibrium_parameters(self): if self.parameters.get_par_by_name('T_esc_e_primaries') is None: _d={} _d['T_esc_e_primaries']=JetModelDictionaryPar(ptype='escape_time', vmin=1, vmax=None, punit='R/c', froz=False, log=False, val=1.0, jetkernel_par_name='emitters.T_esc_e_primaries') self.parameters.add_par_from_dict(_d,self,'_blob',JetParameter) def _sync_inj_emitters_distribution_from_jet_parameters(self): if self.inj_emitters_distribution is None: return for inj_par in self.inj_emitters_distribution.parameters.par_array: jet_par = self.parameters.get_par_by_name(inj_par.name) if jet_par is not None: inj_par.set(val=jet_par.val, skip_dep_par_warning=True) def _sync_jet_parameters_from_inj_emitters_distribution(self): if self.inj_emitters_distribution is None: return for inj_par in self.inj_emitters_distribution.parameters.par_array: jet_par = self.parameters.get_par_by_name(inj_par.name) if jet_par is not None: jet_par.set(val=inj_par.val, skip_dep_par_warning=True) def _build_equilibrium_carrier_distribution(self): size = int(self.inj_emitters_distribution._gamma_grid_size) gmax = self.inj_emitters_distribution.parameters.get_par_by_name('gmax').val_lin gmax = max(gmax, 1.0) gamma_array = np.logspace(0.0, np.log10(gmax), size) n_gamma_array = np.zeros(gamma_array.size, dtype=np.float64) return EmittersArrayDistribution(name=f'{self.inj_emitters_distribution.name}_eq_carrier', emitters_type='electrons', gamma_array=gamma_array, n_gamma_array=n_gamma_array, normalize=False) def _set_equilibrium_injection_on_blob(self): if self.inj_emitters_distribution is None: raise RuntimeError('leptonic equilibrium mode requires an InjEmittersDistribution') self._sync_inj_emitters_distribution_from_jet_parameters() p_gmin = self.parameters.get_par_by_name('gmin') p_gmax = self.parameters.get_par_by_name('gmax') if p_gmin is None or p_gmax is None: raise RuntimeError('gmin/gmax parameters are required to initialize leptonic equilibrium') self._blob.emitters.gmin = 1 #NOTE: safe boundary for eq. evolution self._blob.emitters.gmax = p_gmax.val_lin*2 self._blob.emitters.gamma_grid_size = int(self.inj_emitters_distribution._gamma_grid_size) set_str_attr(self._blob, 'core.DISTR', 'jetset') set_str_attr(self._blob, 'core.PARTICLE', 'electrons') BlazarSED.setNgrid(self._blob) BlazarSED.build_Ne_jetset(self._blob) size = int(self._blob.emitters.gamma_grid_size) gamma_ptr = get_nested_attr(self._blob, 'emitters.griglia_gamma_jetset_Ne_log') ne_ptr = get_nested_attr(self._blob, 'emitters.Ne_jetset') gamma_blob, _ = get_emitters(gamma_ptr, ne_ptr, self._blob, size) self.inj_emitters_distribution._gamma_grid_size = size self.inj_emitters_distribution._gamma_grid = np.asarray(gamma_blob, dtype=np.float64) f = self.inj_emitters_distribution._eval_func(gamma=self.inj_emitters_distribution._gamma_grid) gmin_inj = self.inj_emitters_distribution.parameters.get_par_by_name('gmin').val_lin gmax_inj = self.inj_emitters_distribution.parameters.get_par_by_name('gmax').val_lin f = np.asarray(f, dtype=np.float64) f[self.inj_emitters_distribution._gamma_grid < gmin_inj] = 0.0 f[self.inj_emitters_distribution._gamma_grid > gmax_inj] = 0.0 norm = 1.0 if self.inj_emitters_distribution.normalize is True: integ = np.trapezoid(f, self.inj_emitters_distribution._gamma_grid) if integ > 0: norm = 1.0 / integ BlazarSED.InitRadiative(self._blob,0) volume=self._blob.core.Vol_region self.inj_emitters_distribution._set_L_inj(self.parameters.get_par_by_name('L_inj').val,volume ) q_inj = f * norm * self.inj_emitters_distribution.parameters.get_par_by_name('Q').val q_inj = np.asarray(q_inj, dtype=np.float64) q_inj[np.isnan(q_inj)] = 0 q_inj[np.isinf(q_inj)] = 0 self.inj_emitters_distribution.f = q_inj self.inj_emitters_distribution.gamma_e = self.inj_emitters_distribution._gamma_grid.copy() self.inj_emitters_distribution.n_gamma_e = q_inj.copy() # Safety/clarity: clear transport buffer before writing q_inj. set_emitters(ne_ptr, self._blob, size, np.zeros(size, dtype=np.float64)) set_emitters(ne_ptr, self._blob, size, q_inj)
[docs] def set_emitters_distribution(self, distr=None, log_values=False, emitters_type='electrons', init=True): """Set or replace the emitter distribution used by the model. This method supports analytic distributions (by name), array-based distributions, and injection distributions for leptonic-equilibrium mode. It updates Jet parameters and backend bindings accordingly. Parameters ---------- distr : str or EmittersDistribution or ArrayDistribution or InjEmittersDistribution Distribution specification. log_values : bool, optional If ``True``, interpret analytic distribution parameters in log space. emitters_type : str, optional Emitter type used when creating distributions from names/arrays. init : bool, optional If ``True``, initialize backend state before replacing the distribution. """ if init is True: self.set_blob() self._emitters_distribution_log_values = log_values if self._emitters_distribution_dic is not None: self.del_par_from_dic(self._emitters_distribution_dic) if isinstance(distr, InjEmittersDistribution): q_inj = self._coerce_inj_emitters_distribution(distr) if hasattr(q_inj, '_activate_numba'): q_inj._activate_numba() self._leptonic_equilibrium = True self._blob.emitters.do_equilibrium = 1 self.inj_emitters_distribution = copy.deepcopy(q_inj) self._original_inj_emitters_distr = copy.deepcopy(q_inj) self._emitters_distribution_dic = copy.deepcopy(self.inj_emitters_distribution._parameters_dict) if 'gmin' in self._emitters_distribution_dic: # gmin drives only q_inj in equilibrium mode, not the solved Ne grid. self._emitters_distribution_dic['gmin'].is_in_jetkernel = False self._emitters_distribution_dic['gmin'].jetkernel_par_name = None if 'gmax' in self._emitters_distribution_dic: self._emitters_distribution_dic['gmax'].is_in_jetkernel = True self._emitters_distribution_dic['gmax'].jetkernel_par_name = 'emitters.gmax' self.parameters.add_par_from_dict(self._emitters_distribution_dic, self, '_blob', JetParameter) self._sync_jet_parameters_from_inj_emitters_distribution() self._ensure_leptonic_equilibrium_parameters() self.emitters_distribution = self._build_equilibrium_carrier_distribution() self._original_emitters_distr = copy.deepcopy(self.emitters_distribution) self.emitters_distribution.set_jet(self) self._sync_jet_parameters_from_inj_emitters_distribution() self.emitters_distribution._update_parameters_dict() self._emitters_distribution_name = self.emitters_distribution.name if self.parameters.get_par_by_name('L_inj') is None: self.parameters.add_par( ModelParameter(name='L_inj', par_type='L_inj', val=1E-3, val_min=0, val_max=None, units='erg/s')) if self.parameters.get_par_by_name('Q') is not None: self.parameters.del_par(self.parameters.get_par_by_name('Q')) elif isinstance(distr, ArrayDistribution): self._disable_leptonic_equilibrium(remove_parameters=True) self._emitters_distribution_name = 'from_array' self.emitters_distribution = EmittersDistribution.from_array(self, distr, emitters_type=emitters_type) self._original_emitters_distr = copy.deepcopy(self.emitters_distribution) self._emitters_distribution_dic = self.emitters_distribution._parameters_dict self.parameters.add_par_from_dict(self._emitters_distribution_dic, self, '_blob', JetParameter) self._attach_emitters_pars_to_jet(preserve_value_emitters=True) self.emitters_distribution.update() elif isinstance(distr, EmittersDistribution): self._disable_leptonic_equilibrium(remove_parameters=True) if hasattr(distr,'_activate_numba'): distr._activate_numba() self._original_emitters_distr = copy.deepcopy(distr) self.emitters_distribution = copy.deepcopy(distr) self._update_emitters_pars_dependence() self.emitters_distribution.set_jet(self) self.emitters_distribution._update_parameters_dict() self._emitters_distribution_name = self.emitters_distribution.name self._emitters_distribution_dic = self.emitters_distribution._parameters_dict self.parameters.add_par_from_dict(self._emitters_distribution_dic, self, '_blob', JetParameter) self._attach_emitters_pars_to_jet(preserve_value_emitters=True) self.emitters_distribution.update() elif isinstance(distr, str): self._disable_leptonic_equilibrium(remove_parameters=True) nf=EmittersFactory() self.emitters_distribution = nf.create_emitters(distr, log_values=log_values, emitters_type=emitters_type) if hasattr( self.emitters_distribution,'_activate_numba'): self.emitters_distribution._activate_numba() self._original_emitters_distr = copy.deepcopy(self.emitters_distribution) self.emitters_distribution.set_jet(self) self.emitters_distribution._update_parameters_dict() self._emitters_distribution_name = self.emitters_distribution.name self._emitters_distribution_dic = self.emitters_distribution._parameters_dict self.parameters.add_par_from_dict(self._emitters_distribution_dic, self, '_blob', JetParameter) self._attach_emitters_pars_to_jet(preserve_value_emitters=True) self._update_emitters_pars_dependence() self.emitters_distribution.update() else: raise RuntimeError('distr',type(distr),'not valid should be a string or an',type(EmittersDistribution),'instance')
def _attach_emitters_pars_to_jet(self, preserve_value_emitters=False, skip=None): if skip is None: skip = [] v_dict={} for par in self.emitters_distribution.parameters.par_array[::]: if par.name not in skip: self.emitters_distribution.parameters.del_par(par) v_dict[par.name]=par.val for k in self._emitters_distribution_dic.keys(): par = self.parameters.get_par_by_name(k) if par is None: continue if preserve_value_emitters is True and k in v_dict: par.set(val=v_dict[k],skip_dep_par_warning=True) self.emitters_distribution.parameters.add_par(par) def _update_emitters_pars_dependence(self): for par in self.emitters_distribution.parameters.par_array: if par.immutable is True: warnings.warn('parameter dependence has to be reassigned after emitters distribution is assigned to a jet, now will be reset no dep') self.emitters_distribution.parameters.reset_dependencies() break
[docs] def get_emitters_distribution_name(self): """Return the active emitter-distribution name.""" return self.emitters_distribution.name
[docs] def show_spectral_components(self): """Print currently registered spectral components.""" print ("Spectral components for Jet model:%s"%(self.name)) for comp in self._spectral_components_list: print ("comp: %s "%(comp.name)) print()
[docs] def sed_table(self, restframe='obs'): """Provide the astropy table with SED spectral components Parameters ---------- restframe : str, optional , by default 'obs' Returns ------- astropy table """ try: self.spectral_components.build_table(restframe=restframe) except: self.eval() self._SED_table = self.spectral_components.table return self._SED_table
[docs] def get_spectral_component_by_name(self,name,verbose=True): """Return a spectral component by name. Parameters ---------- name : str Component name. verbose : bool, optional If ``True``, print available names when not found. Returns ------- JetSpecComponent or None Matching component, or ``None`` if missing. """ for i in range(len(self._spectral_components_list)): if self._spectral_components_list[i].name==name: return self._spectral_components_list[i] else: if verbose==True: print ("no spectral components with name %s found"%name) print ("names in array are:") self.show_spectral_components() return None
[docs] def list_spectral_components(self): """Print the list of spectral-component names.""" for i in range(len(self._spectral_components_list)): print (self._spectral_components_list[i].name)
[docs] def get_spectral_component_names_list(self): """Return spectral-component names as a list.""" _l=[] for i in range(len(self._spectral_components_list)): _l.append(self._spectral_components_list[i].name) return _l
[docs] def del_spectral_component(self,name): """Delete a spectral component from the model.""" print('deleting spectral component', name) if name in self.EC_components_list: self.del_EC_component(name) else: self._del_spectral_component(name) if name in self.basic_components_list: self.basic_components_list.remove(name)
def _del_spectral_component(self, name, verbose=True): comp=self.get_spectral_component_by_name(name,verbose=verbose) if comp._state_dict!={}: comp.state='off' if comp is not None: self._spectral_components_list.remove(comp) self.spectral_components.__delattr__(name) def _add_spectral_component(self, name, var_name=None, state_dict=None,state=None): #print("==> adding", name, var_name) self._spectral_components_list.append( JetSpecComponent(self, name, self._blob, var_name=var_name, state_dict=state_dict, state=state)) setattr(self.spectral_components,name,self._spectral_components_list[-1]) def _update_spectral_components(self,tau=None): for ID,s in enumerate(self._spectral_components_list): self._spectral_components_list[ID]= JetSpecComponent(self, s.name, self._blob, var_name=s._var_name, state_dict=s._state_dict, state=s.state, tau=tau) self._spectral_components_list[ID].hidden = s.hidden setattr(self.spectral_components, self._spectral_components_list[ID].name, self._spectral_components_list[ID])
[docs] def add_basic_components(self): """Add default ``Sum``, ``Sync``, and ``SSC`` components.""" self.basic_components_list=['Sum','Sync','SSC'] self._add_spectral_component('Sum') self._add_spectral_component('Sync', var_name='core.do_Sync', state_dict=dict((('on', 1), ('off', 0), ('self-abs', 2))),state='self-abs') self._add_spectral_component('SSC', var_name='core.do_SSC', state_dict=dict((('on', 1), ('off', 0))))
[docs] def add_sync_component(self,state='self-abs'): """Add a synchrotron component with the requested state.""" self._add_spectral_component('Sync', var_name='core,do_Sync', state_dict=dict((('on', 1), ('off', 0), ('self-abs', 2))), state=state)
[docs] def add_SSC_component(self,state='on'): """Add a synchrotron self-Compton component.""" self._add_spectral_component('SSC', var_name='core.do_SSC', state_dict=dict((('on', 1), ('off', 0))),state=state)
[docs] def del_EC_component(self,EC_components_list, disk_type='BB'): """Remove one or more external-Compton related components. Parameters ---------- EC_components_list : str or list of str, optional disk_type : str, optional Disk template type for disk-dependent EC components. """ if isinstance(EC_components_list, six.string_types): EC_components_list = [EC_components_list] if 'All' in EC_components_list: EC_components_list=self._allowed_EC_components_list[::] for EC_component in EC_components_list: if EC_component not in self._allowed_EC_components_list: raise RuntimeError("EC_component %s not allowed" % EC_component, "please choose among ", self._allowed_EC_components_list) if EC_component=='Disk': if self.get_spectral_component_by_name('Disk', verbose=False) is not None: self._del_spectral_component('Disk', verbose=False) self.EC_components_list.remove('Disk') if self.get_spectral_component_by_name('EC_Disk',verbose=False) is not None: self._del_spectral_component('EC_Disk') self._blob.core.do_EC_Disk = 0 self.EC_components_list.remove('EC_Disk') if self.get_spectral_component_by_name('EC_BLR', verbose=False) is not None: self._blob.core.do_EC_BLR=0 self._del_spectral_component('EC_BLR', verbose=False) self.EC_components_list.remove('EC_BLR') if EC_component=='EC_Disk': if self.get_spectral_component_by_name('EC_Disk', verbose=False) is not None: self._blob.core.do_EC_Disk=0 self._del_spectral_component('EC_Disk', verbose=False) self.EC_components_list.remove('EC_Disk') if EC_component=='EC_BLR': if self.get_spectral_component_by_name('EC_BLR', verbose=False) is not None: self._blob.core.do_EC_BLR=0 self._del_spectral_component('EC_BLR', verbose=False) self.EC_components_list.remove('EC_BLR') if EC_component=='DT': if self.get_spectral_component_by_name('DT', verbose=False) is not None: self._del_spectral_component('DT', verbose=False) self.EC_components_list.remove('DT') if self.get_spectral_component_by_name('EC_DT', verbose=False) is not None: self._blob.core.do_EC_DT = 0 self._del_spectral_component('EC_DT', verbose=False) self.EC_components_list.remove('EC_DT') if EC_component=='EC_DT': if self.get_spectral_component_by_name('EC_DT', verbose=False) is not None: self._blob.core.do_EC_DT=0 self._del_spectral_component('EC_DT', verbose=False) self.EC_components_list.remove('EC_DT') if EC_component=='EC_CMB': if self.get_spectral_component_by_name('EC_CMB', verbose=False) is not None: self._blob.core.do_EC_CMB=0 self._del_spectral_component('EC_CMB', verbose=False) self.EC_components_list.remove('EC_CMB') if EC_component=='Star': if self.get_spectral_component_by_name('Star', verbose=False) is not None: self._blob.core.do_Star=0 self._del_spectral_component('Star', verbose=False) self.EC_components_list.remove('Star') if EC_component=='EC_Star': if self.get_spectral_component_by_name('EC_Star', verbose=False) is not None: self._blob.core.do_EC_Star=0 self._del_spectral_component('EC_Star', verbose=False) self.EC_components_list.remove('EC_Star') self.del_par_from_dic(build_ExtFields_dic(EC_components_list,disk_type))
[docs] def add_EC_component(self,EC_components_list=[],disk_type=None): """Add one or more external-Compton related components. Parameters ---------- EC_components_list : str or list of str, optional Component names to add. ``'All'`` enables all supported EC entries. disk_type : str, optional Disk template type for disk-dependent EC components. """ if disk_type is not None: if disk_type not in self._allwed_disk_type: raise RuntimeError('disk type',disk_type,'not in allwowed', self._allwed_disk_type) else: if self.get_par_by_name('disk_type') is not None: disk_type=self.get_par_by_name('disk_type').val else: disk_type='BB' if isinstance(EC_components_list, six.string_types): EC_components_list=[EC_components_list] if 'All' in EC_components_list: EC_components_list=self._allowed_EC_components_list[::] for EC_component in EC_components_list: if EC_component not in self._allowed_EC_components_list: raise RuntimeError("EC_component %s not allowed" % EC_component, "please choose among ", self._allowed_EC_components_list) if EC_component == 'Disk': if self.get_spectral_component_by_name('Disk',verbose=False) is None: self._add_spectral_component('Disk',var_name='core.do_Disk', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('Disk') if EC_component=='EC_Disk': #self._blob.core.do_EC_Disk=1 if self.get_spectral_component_by_name('EC_Disk',verbose=False) is None: self._add_spectral_component('EC_Disk', var_name='core.do_EC_Disk', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('EC_Disk') if self.get_spectral_component_by_name('Disk',verbose=False) is None: self._add_spectral_component('Disk',var_name='core.do_Disk', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('Disk') if EC_component=='EC_BLR': #self._blob.core.do_EC_BLR=1 if self.get_spectral_component_by_name('EC_BLR',verbose=False) is None: self._add_spectral_component('EC_BLR', var_name='core.do_EC_BLR', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('EC_BLR') if self.get_spectral_component_by_name('Disk',verbose=False) is None: self._add_spectral_component('Disk',var_name='core.do_Disk', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('Disk') if EC_component == 'DT': if self.get_spectral_component_by_name('DT',verbose=False) is None: self._add_spectral_component('DT',var_name='core.do_DT', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('DT') if self.get_spectral_component_by_name('Disk',verbose=False) is None: self._add_spectral_component('Disk',var_name='core.do_Disk', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('Disk') if EC_component == 'Star': if self.get_spectral_component_by_name('Star',verbose=False) is None: self._add_spectral_component('Star',var_name='core.do_Star', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('Star') if EC_component == 'EC_Star': if self.get_spectral_component_by_name('Star',verbose=False) is None: self._add_spectral_component('Star',var_name='core.do_Star', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('Star') if self.get_spectral_component_by_name('EC_Star',verbose=False) is None: self._add_spectral_component('EC_Star', var_name='core.do_EC_Star', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('EC_Star') if EC_component=='EC_DT': #self._blob.core.do_EC_DT=1 if self.get_spectral_component_by_name('EC_DT',verbose=False) is None: self._add_spectral_component('EC_DT', var_name='core.do_EC_DT', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('EC_DT') if self.get_spectral_component_by_name('DT',verbose=False) is None: self._add_spectral_component('DT',var_name='core.do_DT', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('DT') if self.get_spectral_component_by_name('Disk',verbose=False) is None: self._add_spectral_component('Disk',var_name='core.do_Disk', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('Disk') if EC_component=='EC_CMB': #self._blob.core.do_EC_CMB=1 if self.get_spectral_component_by_name('EC_CMB',verbose=False) is None: self._add_spectral_component('EC_CMB', var_name='core.do_EC_CMB', state_dict=dict((('on', 1), ('off', 0)))) self.EC_components_list.append('EC_CMB') #IF disk_type is already a parameter it has to be updated here to make it effective self.parameters.add_par_from_dict(build_ExtFields_dic(self.EC_components_list, disk_type),self,'_blob',JetParameter) #print('--> disk_type',disk_type) if disk_type is not None and 'Disk' in self.EC_components_list: #remove the old disk type parameters if self.parameters.get_par_by_name('disk_type') is not None: self.del_par_from_dic(build_ExtFields_dic(['Disk'],self.parameters.get_par_by_name('disk_type').val)) else: self.EC_components_list.append('Disk') self.parameters.add_par_from_dict(build_ExtFields_dic(self.EC_components_list, disk_type),self,'_blob',JetParameter) self.parameters.disk_type.val = disk_type
[docs] def enable_internal_absorption(self, comp, nu_min=1E20, N_soft=50, N_hard=50, N_R_H=50, N_theta=50, use_R_H_profile_extrapolation=False): """Enable internal gamma-gamma absorption for a seed component. Parameters ---------- comp : str Seed-photon component name used to compute optical depth. nu_min : float, optional Minimum frequency used for the internal absorption solver (Hz). N_soft, N_hard, N_R_H, N_theta : int, optional Numerical grid sizes used by the absorption solver. use_R_H_profile_extrapolation : bool, optional If ``True``, extrapolate the radial profile when required. """ self._internal_absorption_comp[comp]={} self._internal_absorption_comp[comp]['pars']=dict(N_hard=N_hard, N_soft=N_soft, N_theta=N_theta, N_R_H=N_R_H, nu_min=nu_min, comp=comp, use_R_H_profile_extrapolation=use_R_H_profile_extrapolation) self._internal_absorption_comp[comp]['obj']=InternalAbsorption(jet=self, nu_min=nu_min, seed_photons_name=comp, N_soft=N_soft, N_hard=N_hard, N_R_H=N_R_H, N_theta=N_theta, use_R_H_profile_extrapolation=use_R_H_profile_extrapolation)
[docs] def remove_internal_absorption(self,comp): """Disable internal absorption for a component.""" if comp in self._internal_absorption_comp.keys(): del self._internal_absorption_comp[comp]
[docs] def show_internal_absorption_components(self): """Print enabled internal-absorption components and settings.""" if len(self._internal_absorption_comp.keys())>0: for comp in self._internal_absorption_comp.keys(): print('internal absorption for component:', comp) for p in self._internal_absorption_comp[comp]['pars'].items(): print(p) print() else: print('internal absorption not enabled in this jet model')
[docs] def eval_internal_absorption(self,comp,skip_check=True,peak=False): """Evaluate internal absorption for a component. Returns ------- tuple ``(tau, nu_src)`` if available, otherwise ``(None, None)``. """ if comp in self._internal_absorption_comp.keys(): return self._internal_absorption_comp[comp]['obj'].eval(get_tau=True,skip_check=skip_check,peak=peak) return None,None
[docs] def del_par_from_dic(self,model_dic): """Delete parameters listed in a model-parameter dictionary.""" for key in model_dic.keys(): par=self.parameters.get_par_by_name(key) if par is not None: self.parameters.del_par(par)
[docs] def get_DL_cm(self,eval_model=False): """Return luminosity distance in centimeters.""" if eval_model is True: self.set_blob() if self.cosmo._c is not None: return self.cosmo.get_DL_cm(self.parameters.z_cosm.val) else: return self.cosmo.get_DL_cm()
#@safe_run
[docs] def get_beaming(self,): """Return the current beaming factor from the backend.""" BlazarSED.SetBeaming(self._blob) return self._blob.core.beam_obj
[docs] def set_flag(self,flag): """Set the backend STEM flag used by jetkernel output helpers.""" self._blob.core.STEM=flag
[docs] def get_flag(self): """Return the backend STEM flag.""" return self._blob.core.STEM
[docs] def get_path(self): """Return the backend working path.""" return self._blob.core.path
[docs] def set_path(self,path,clean_work_dir=True): """Set and create the backend working path.""" if path.endswith('/'): pass else: path+='/' set_str_attr(self._blob,'core.path',path) makedir(path,clean_work_dir=clean_work_dir)
[docs] def get_IC_mode(self): """Return inverse-Compton mode label (``'on'`` or ``'off'``).""" return dict(map(reversed, self._IC_states.items()))[self._blob.core.do_IC]
[docs] def set_external_field_transf(self,val): """Set external-field transformation frame. Parameters ---------- val : str Allowed values are ``'blob'`` and ``'disk'``. """ if val not in self._external_field_transf.keys(): raise RuntimeError('val',val,'not in allowed values',self._external_field_transf.keys()) self._blob.core.EC_stat=self._external_field_transf[val] self._blob.core.EC_stat_orig=self._external_field_transf[val]
[docs] def get_external_field_transf(self): """Return external-field transformation frame label.""" return dict(map(reversed, self._external_field_transf.items()))[self._blob.core.EC_stat]
[docs] def set_emiss_lim(self,val): """Set lower emissivity bound used by backend calculations.""" self._blob.core.emiss_lim=val
[docs] def get_emiss_lim(self): """Return lower emissivity bound used by backend calculations.""" return self._blob.core.emiss_lim
@property def IC_nu_size(self): """Return inverse-Compton spectral-grid size.""" return self._blob.core.nu_IC_size @IC_nu_size.setter def IC_nu_size(self, val): """Set inverse-Compton spectral-grid size.""" self.set_IC_nu_size(val)
[docs] def get_IC_nu_size(self): """Return inverse-Compton spectral-grid size.""" return self._blob.core.nu_IC_size
[docs] def set_IC_nu_size(self, val): """Set inverse-Compton spectral-grid size.""" if val > self._nu_static_size: raise RuntimeError('value can not exceed',self._nu_static_size) self._blob.core.nu_IC_size = val
@property def nu_seed_size(self): """Return seed-photon spectral-grid size.""" return self._blob.core.nu_seed_size
[docs] def get_seed_nu_size(self): """Return seed-photon spectral-grid size.""" return self._blob.core.nu_seed_size
@nu_seed_size.setter def nu_seed_size(self,val): """Set seed-photon spectral-grid size.""" self.set_seed_nu_size(val)
[docs] def set_seed_nu_size(self,val): """Set seed-photon spectral-grid size.""" if val>self._nu_static_size: raise RuntimeError('value can not exceed',self._nu_static_size) self._blob.core.nu_seed_size=val
[docs] def set_gamma_grid_size(self,val): """Set particle Lorentz-factor grid size for emitters.""" self.emitters_distribution.set_grid_size(gamma_grid_size=val)
@property def gamma_grid_size(self): """Return particle Lorentz-factor grid size.""" return self._blob.emitters.gamma_grid_size @gamma_grid_size.setter def gamma_grid_size(self,val): """Set particle Lorentz-factor grid size.""" self.emitters_distribution.set_grid_size(gamma_grid_size=val) @property def nu_min(self): """Return minimum frequency of the model grid (Hz).""" return self._get_nu_min_grid() @nu_min.setter def nu_min(self, val): """Set minimum frequency of the model grid (Hz).""" if hasattr(self, '_blob'): self._set_nu_min_grid(val) def _get_nu_min_grid(self): return self._blob.core.nu_start_grid def _set_nu_min_grid(self, val): self._blob.core.nu_start_grid=val @property def Norm_distr(self): """Return normalization mode of the emitter distribution. Returns ------- bool or int ``True``/``1`` means physical-density normalization, ``False``/``0`` means scaling-factor normalization. """ if hasattr(self,'emitters_distribution'): if self.emitters_distribution._user_defined is False: return self._blob.emitters.Norm_distr else: return self.emitters_distribution.normalize else: return None @Norm_distr.setter def Norm_distr(self, val): """Set normalization mode of the emitter distribution.""" if hasattr(self, 'emitters_distribution') and self.get_par_by_name('N') is not None: if val == 1 or val is True: if self.emitters_distribution._user_defined is False: self._blob.emitters.Norm_distr = 1 else: self.emitters_distribution.normalize = val self.parameters.N.par_type='emitters_density' self.parameters.N.units ='1/cm3' elif val == 0 or val is False: if self.emitters_distribution._user_defined is False: self._blob.emitters.Norm_distr = 0 else: self.emitters_distribution.normalize = val self.parameters.N.par_type = 'scaling_factor' else: raise RuntimeError('value', val, 'not allowed, allowed 0/1 or False/True')
[docs] def switch_Norm_distr_ON(self): """Enable physical-density normalization mode.""" self.Norm_distr=True
[docs] def switch_Norm_distr_OFF(self): """Enable scaling-factor normalization mode.""" self.Norm_distr = False
@property def nu_max(self): """Return maximum frequency of the model grid (Hz).""" return self._get_nu_max_grid() @nu_max.setter def nu_max(self, val): """Set maximum frequency of the model grid (Hz).""" if hasattr(self, '_blob'): self._set_nu_max_grid(val) def _set_nu_max_grid(self, val): self._blob.core.nu_stop_grid=val def _get_nu_max_grid(self): return self._blob.core.nu_stop_grid @property def nu_size(self): """Return number of sampled frequencies in Python-side evaluation.""" return self._nu_size @nu_size.setter def nu_size(self, size): """Set number of sampled frequencies in Python-side evaluation.""" self._nu_size = size
[docs] def set_nu_grid_size(self, val): """Set jetkernel frequency-grid size.""" self._set_nu_grid_size_blob(val)
@property def nu_grid_size(self): """Return jetkernel frequency-grid size.""" return self._get_nu_grid_size_blob() @nu_grid_size.setter def nu_grid_size(self, val): """Set jetkernel frequency-grid size.""" self._set_nu_grid_size_blob(val) def _set_nu_grid_size_blob(self, val): if val < 100: val = 100 if val > self._static_spec_arr_grid_size: raise RuntimeError('value can not exceed', self._nu_static_size) self._blob.core.nu_grid_size=val def _get_nu_grid_size_blob(self): return self._blob.core.nu_grid_size
[docs] def set_verbosity(self,val): """Set backend verbosity level.""" self._blob.core.verbose=val
[docs] def get_verbosity(self): """Return backend verbosity level.""" return self._blob.core.verbose
[docs] def debug_synch(self): """Print synchrotron integration debug information.""" print ("nu stop synch", self._blob.Sync.spec.nu_max) print ("nu stop synch ssc", self._blob.Sync.nu_stop_Sync_ssc) print ("ID MAX SYNCH", self._blob.Sync.NU_INT_STOP_Sync_SSC)
[docs] def debug_SSC(self): """Print SSC integration debug information.""" print ("nu start SSC", self._blob.SSC.spec.nu_min) print ("nu stop SSC", self._blob.SSC.spec.nu_max) print ("ID MAX SSC", self._blob.SSC.NU_INT_STOP_COMPTON_SSC)
[docs] def show_emitters_distribution(self): """Print emitter-distribution settings and parameters.""" print('-'*80) print('%s distribution:'%self.emitters_distribution.emitters_type) print(" type: %s " % (self._emitters_distribution_name)) print(" gamma energy grid size: ", self.gamma_grid_size) print(" gmin grid : %e" % self._blob.emitters.gmin_griglia) print(" gmax grid : %e" % self._blob.emitters.gmax_griglia) print(" normalization ", self.Norm_distr) print(" log-values ", self._emitters_distribution_log_values) print('') self.parameters.par_array.sort(key=lambda x: x.name, reverse=False) self.parameters.show_pars(names_list=self._emitters_distribution_dic.keys())
[docs] def show_model(self): """ shortcut to :class:`ModelParametersArray.show_pars` method shows all the paramters in the model """ print("") print('-'*80) print("model description: ") print('-'*80) print("type:",self.__class__.__name__) print("name: %s " % (self.name)) print("geometry: %s " % (self.geometry)) print('') print('%s distribution:'%self.emitters_distribution.emitters_type) print(" type: %s " % (self._emitters_distribution_name)) print (" gamma energy grid size: ",self.gamma_grid_size) print (" gmin grid : %e"%self._blob.emitters.gmin_griglia) print (" gmax grid : %e"%self._blob.emitters.gmax_griglia) print(" normalization: ", self.Norm_distr) print(" log-values: ", self._emitters_distribution_log_values) #print(" leptonic equilibrium: ", getattr(self, '_leptonic_equilibrium', False)) _p = self.parameters.get_par_by_name('NH_cold_to_rel_e') if _p is not None: print(' ratio of cold protons to relativistic electrons: %e'%_p.val) print('') if 'Disk' in self.EC_components_list: print('accretion disk:') BlazarSED.set_Disk(self._blob) print(' disk Type: %s'%self.parameters.get_par_by_name('disk_type').val) print(' L disk: %e (erg/s)'%self.parameters.get_par_by_name('L_Disk').val) print(' T disk: %e (K)'%self._blob.Disk.T_Disk) print(' nu peak disk: %e (Hz)'%BlazarSED.eval_nu_peak_Disk(self._blob.Disk.T_Disk)) if self.parameters.get_par_by_name('disk_type').val == 'MultiBB': print(' Sw radius %e (cm)'%self._blob.Disk.R_Sw) print(' L Edd. %e (erg/s)'%self._blob.Disk.L_Edd) yr=86400*365 print(' accr_rate: %e (M_sun/yr)'%(yr*self._blob.Disk.accr_rate/BlazarSED.m_sun)) print(' accr_rate Edd.: %e (M_sun/yr)'%(yr*self._blob.Disk.accr_Edd/BlazarSED.m_sun)) if 'EC_Star' in self.EC_components_list: print('Star:') print(' Star radius %e (cm)'%self._blob.Star.R_Star, ',%e (R_Sun)'%(self._blob.Star.R_Star/constants.R_sun.to('cm').value)) print('') print('radiative fields:') print (" seed photons grid size: ", self.nu_seed_size) print (" IC emission grid size: ", self.get_IC_nu_size()) print (' source emissivity lower bound : %e' % self._blob.core.emiss_lim) print (' spectral components:') for _s in self._spectral_components_list: print(" name:%s,"%_s.name, 'state:', _s.state) print(" name:%s,"%_s.name, 'hidden:', _s.hidden) print('external fields transformation method:', self.get_external_field_transf()) print ('') print ('SED info:') print (' nu grid size jetkernel: %d' % self.nu_grid_size) print (' nu size: %d' % self.nu_size) print (' nu mix (Hz): %e' % self._get_nu_min_grid()) print (' nu max (Hz): %e' % self._get_nu_max_grid()) print('') print('flux plot lower bound : %e' % self.flux_plot_lim) print('') print('-'*80) self.show_pars() print('-' * 80)
[docs] def plot_model(self,plot_obj=None,clean=False,label=None,comp=None,sed_data=None,color=None,auto_label=True,line_style='-',frame='obs', density=False): """Plot model spectral components on a :class:`~jetset.plot_sedfit.PlotSED`. Parameters ---------- plot_obj : jetset.plot_sedfit.PlotSED, optional Existing plot object. If omitted, a new one is created. clean : bool, optional If ``True``, remove previously plotted model lines before adding new ones. label : str, optional Label to use for the plotted line(s). If not provided, component names are used when ``auto_label=True``. comp : str, optional Name of a single spectral component to plot. If omitted, all active components plus ``Sum`` are plotted. sed_data : jetset.data_loader.ObsData, optional Observational data to add to the same plot. color : str, optional Matplotlib color for model line(s). auto_label : bool, optional If ``True``, automatically derive labels from component names. line_style : str, optional Line style used for non-sum components. frame : {"obs", "src", "blob"}, optional Output frame used for plotting. density : bool, optional If ``True``, plot density representation. Returns ------- jetset.plot_sedfit.PlotSED Plot object with added model traces. """ plot_obj=self._set_up_plot(plot_obj,sed_data,frame,density) if clean is True: plot_obj.clean_model_lines() if comp is not None: c = self.get_spectral_component_by_name(comp) if c is None: raise RuntimeError('spectral component', comp, 'not found') if label is not None: comp_label = label elif label is None and auto_label is True: comp_label = c.name else: comp_label=None if c.state!='off': plot_obj.add_model_plot(c.SED, line_style=line_style, label=comp_label,flim=self.flux_plot_lim,color=color,auto_label=auto_label,update=False, frame=frame) else: for c in self.spectral_components_list: comp_label = c.name if auto_label is not True: comp_label=label #print('comp label',comp_label) if c.state != 'off' and c.name!='Sum': plot_obj.add_model_plot(c.SED, line_style=line_style, label=comp_label,flim=self.flux_plot_lim,auto_label=auto_label,color=color,update=False, frame=frame) c=self.get_spectral_component_by_name('Sum') if label is not None: comp_label = label else: comp_label='Sum' plot_obj.add_model_plot(c.SED, line_style='--', label=comp_label, flim=self.flux_plot_lim,color=color,update=False, frame=frame) plot_obj.update_plot() return plot_obj
#@safe_run
[docs] def set_blob(self): """Synchronize Python parameters/distributions to the backend blob.""" if self._leptonic_equilibrium is True: self._blob.emitters.do_equilibrium = 1 self._set_equilibrium_injection_on_blob() else: self._blob.emitters.do_equilibrium = 0 if self.emitters_distribution._user_defined is True: self.emitters_distribution._fill() BlazarSED.Init(self._blob, self.get_DL_cm()) if self.emitters_distribution._user_defined is True: self.emitters_distribution._set_blob()
#@safe_run
[docs] def set_external_fields(self): """Update external radiation fields on the backend.""" self.set_blob() BlazarSED.spectra_External_Fields(1,self._blob,1)
[docs] def lin_func(self, lin_nu, init, phys_output=False, update_emitters=True): """Evaluate model spectrum in linear units on ``lin_nu``. Applies internal absorption (if enabled) and returns the total model after hidden components are removed. """ if self.emitters_distribution is None: raise RuntimeError('emitters distribution not defined') tau_tot=np.zeros(lin_nu.shape) for iac in self._internal_absorption_comp.keys(): int_abs=self._internal_absorption_comp[iac]['obj'] tau_c,nu_src=int_abs.eval(get_tau=True,lin_nu=lin_nu) tau_tot+=tau_c if init is True: self.set_blob() self._update_spectral_components(tau=tau_tot) BlazarSED.Run_SED(self._blob) if phys_output==True: BlazarSED.EnergeticOutput(self._blob) if self.emitters_distribution._user_defined is False: if update_emitters is True: self.emitters_distribution.update() else: self.emitters_distribution._fill() nu_sed_sum, nuFnu_sed_sum = self.spectral_components.Sum.get_SED_points(lin_nu=lin_nu,log_log=False,interp=self._jetkernel_interp) nuFnu_sed_hidden=0. for _sc in self._spectral_components_list: if _sc.hidden is True: _, _nuFnu_sed= _sc.get_SED_points(lin_nu=lin_nu,log_log=False,interp=self._jetkernel_interp) nuFnu_sed_hidden += _nuFnu_sed nuFnu_sed_sum = nuFnu_sed_sum - nuFnu_sed_hidden return nu_sed_sum, nuFnu_sed_sum*np.exp(-tau_tot)
def _eval_model(self, lin_nu, log_nu, init, loglog, phys_output=False, update_emitters=True): log_model = None lin_nu, lin_model = self.lin_func(lin_nu, init, phys_output, update_emitters) if loglog is True: log_model = np.log10(lin_model) return lin_model, log_model def _prepare_nu_model(self, nu, loglog): if nu is None: lin_nu = np.logspace(np.log10(self.nu_min), np.log10(self.nu_max), self.nu_size) log_nu = np.log10(lin_nu) else: if np.shape(nu) == (): nu = np.array([nu]) if loglog is True: lin_nu = np.power(10., nu) log_nu = nu else: log_nu = np.log10(nu) lin_nu = nu return lin_nu, log_nu #@safe_run
[docs] def eval(self, init=True, fill_SED=True, nu=None, get_model=False, loglog=False, plot=None, label=None, phys_output=False, update_emitters=True): """Evaluate the jet model. Parameters are compatible with :meth:`Model.eval`, with additional control over backend initialization and emitter updates. """ out_model = None lin_nu, log_nu = self._prepare_nu_model(nu, loglog) lin_model, log_model= self._eval_model(lin_nu, log_nu ,init, loglog, phys_output=phys_output, update_emitters=update_emitters) if fill_SED is True: self._fill(lin_nu,lin_model) self.spectral_components.Sum.SED.nuFnu=lin_model if get_model is True: if loglog is True: out_model = log_model else: out_model = lin_model return out_model
[docs] def energetic_report(self,verbose=True,): """Build and optionally print energetic quantities table.""" self._build_energetic_report() if verbose is True: _show_table(self.energetic_report_table)
def _build_energetic_dict(self): self.energetic_dict={} BlazarSED.SetBeaming(self._blob) #NOTE: workaround for the users be sure they evaluate correctly the jet energetics for 'disk' frame _change_frame=False if self.get_external_field_transf()=='disk': self.set_external_field_transf('blob') self.eval() _change_frame=True self._energetic = BlazarSED.EnergeticOutput(self._blob) if _change_frame is True: self.set_external_field_transf('disk') self.eval() _par_array=ModelParameterArray() _name = [i for i in self._energetic.__class__.__dict__.keys() if i[:1] != '_'] _par_array.add_par(ModelParameter(name='BulkLorentzFactor', val=self._blob.core.BulkFactor, units='',par_type='jet-bulk-factor')) self.energetic_dict['BulkLorentzFactor']= self._blob.core.BulkFactor try: for _n in _name: units = 'skip_this' if _n[0]=='L': par_type='Lum. blob rest. frame.' units='erg/s' elif _n[0] == 'U' and 'DRF' not in _n: par_type = 'Energy dens. blob rest. frame' units = 'erg/cm^3' elif _n[0] == 'U' and 'DRF' in _n: par_type = 'Energy dens. disk rest. frame' units = 'erg/cm^3' elif _n[0] == 'j': par_type = 'jet Lum.' units = 'erg/s' else: if _n !='thisown': warnings.warn('energetic name %s not understood'%_n) if units == 'skip_this': pass else: self.energetic_dict[_n]=getattr(self._energetic, _n) _par_array.add_par(ModelParameter(name=_n, val=getattr(self._energetic, _n), units=units,par_type=par_type)) except Exception as e: print('_energetic',self._energetic) raise RuntimeError('energetic_report failed',e) return _par_array def _set_energetic_report(self,_par_array): if self.emitters_distribution.emitters_type=='electrons': _par_array.add_par(ModelParameter(name='NH_cold_to_rel_e', val=self.parameters.NH_cold_to_rel_e.val, units='',par_type='cold_p_to_rel_e_ratio')) self.energetic_dict['NH_cold_to_rel_e'] = self.parameters.NH_cold_to_rel_e.val self.energetic_report_table = _par_array.par_table self.energetic_report_table.remove_columns(['log','frozen','phys. bound. min','phys. bound. max']) self.energetic_report_table.rename_column('par type','type') if self.emitters_distribution.emitters_type=='electrons': _i=self.energetic_report_table['name']=='U_p_target' _i=np.argwhere(_i==True).flatten()[0] self.energetic_report_table.remove_row(_i) _=self.energetic_dict.pop('U_p_target') _i=self.energetic_report_table['name']=='U_p' _i=np.argwhere(_i==True).flatten()[0] self.energetic_report_table.remove_row(_i) _=self.energetic_dict.pop('U_p') _i=self.energetic_report_table['name']=='L_pp_gamma_rf' _i=np.argwhere(_i==True).flatten()[0] self.energetic_report_table.remove_row(_i) _=self.energetic_dict.pop('L_pp_gamma_rf') _i=self.energetic_report_table['name']=='jet_L_p' _i=np.argwhere(_i==True).flatten()[0] self.energetic_report_table.remove_row(_i) _=self.energetic_dict.pop('jet_L_p') if self.emitters_distribution.emitters_type=='protons': _i=self.energetic_report_table['name']=='U_p_cold' _i=np.argwhere(_i==True).flatten()[0] self.energetic_report_table.remove_row(_i) _=self.energetic_dict.pop('U_p_cold') _i=self.energetic_report_table['name']=='jet_L_p_cold' _i=np.argwhere(_i==True).flatten()[0] self.energetic_report_table.remove_row(_i) _=self.energetic_dict.pop('jet_L_p_cold') if isinstance(self,GalacticUnbeamed): i=[] for n in self.energetic_report_table['name']: if 'jet' in n: i.append(True) _=self.energetic_dict.pop(n) else: i.append(False) i=np.array(i) i=np.argwhere(i==True) self.energetic_report_table.remove_rows(i) _d_l=['U_Disk','U_BLR','U_DT','U_CMB','U_Synch_DRF','U_Star'] i=[] for n in self.energetic_report_table['name']: if n in _d_l: i.append(True) _=self.energetic_dict.pop(n) else: i.append(False) i=np.array(i) i=np.argwhere(i==True) self.energetic_report_table.remove_rows(i) for n in self.energetic_report_table['name']: if n.endswith('_DRF'): _i=self.energetic_report_table['name']==n _i=np.argwhere(_i==True).flatten()[0] nn=self.energetic_report_table[_i]['name'].replace('_DRF','') self.energetic_report_table[_i]['name']=nn self.energetic_dict[nn]=self.energetic_dict.pop(n) if n.endswith('_rf'): _i=self.energetic_report_table['name']==n _i=np.argwhere(_i==True).flatten()[0] nn=self.energetic_report_table[_i]['name'].replace('_rf','') self.energetic_report_table[_i]['name']=nn self.energetic_dict[nn]=self.energetic_dict.pop(n) for r in self.energetic_report_table: if 'blob' in r['type']: nn=r['type'].replace('blob','') r['type']=nn if 'disk' in r['type']: nn=r['type'].replace('disk','') r['type']=nn def _build_energetic_report(self,): try: _par_array=self._build_energetic_dict() self._set_energetic_report(_par_array) except Exception as e: raise RuntimeError('energetic_report failed',e)
[docs] def get_SED_peak(self,peak_name=None,freq_range=None,log_log=False): """Return SED peak from backend attributes or a frequency interval. Parameters ---------- peak_name : str, optional Backend peak attribute name (for example ``'nu_p_Sync_obs'``). freq_range : tuple, optional ``(nu_min, nu_max)`` range used to search the peak numerically. log_log : bool, optional If ``True``, return values in log10 scale. """ if peak_name is not None and freq_range is not None: print ("either you provide peak_name or freq_range") raise ValueError elif peak_name is not None: try: if log_log==False: return get_nested_attr(self._blob, peak_name) else: return np.log10(get_nested_attr(self._blob, peak_name)) except: print ("peak name %s, not found, check name"%peak_name) raise ValueError else: x,y=self.spectral_components.Sum.get_SED_points(log_log=log_log) msk1=x>freq_range[0] msk2=x<freq_range[1] y_m= y[msk1*msk2].max() x_id= np.argmax(y[msk1*msk2]) return x[msk1*msk2][x_id],y_m
[docs] def get_component_peak(self,comp_name=None,log_log=False): """Return peak frequency and flux for a spectral component.""" comp = self.get_spectral_component_by_name(comp_name) ID = np.argmax(comp.SED.nuFnu.value) x_p, y_p = comp.SED.nu[ID].value, comp.SED.nuFnu[ID].value if log_log is True: x_p, y_p=np.log10([x_p,y_p]) return x_p, y_p
[docs] def set_num_c_threads(self,N): """Set number of C threads used by the backend.""" if self.verbose: print("===> setting C threads to",N) if isinstance(N,int): self._blob.core.N_THREADS=N else: raise RuntimeError('N must be integer')
[docs] class Jet(JetBase): """Concrete jet model for leptonic or hadronic emitter populations.""" def __init__(self, cosmo=None, name=None, emitters_type='electrons', emitters_distribution='plc', emitters_distribution_log_values=False, beaming_expr='delta', jet_workplace=None, verbose=False, clean_work_dir=True, electron_distribution=None, proton_distribution=None, electron_distribution_log_values=None, proton_distribution_log_values=None, geometry='spherical'): """Initialize a :class:`Jet` model. Parameters ---------- cosmo : Cosmo, optional Cosmology helper object. name : str, optional Model name. emitters_type : str, optional Emitter type if no explicit electron/proton distribution alias is used. emitters_distribution : str or distribution object, optional Default distribution configuration. emitters_distribution_log_values : bool, optional If ``True``, interpret analytic distribution parameters in log space. beaming_expr : str, optional Beaming parametrization. jet_workplace : WorkPlace, optional Output workspace. verbose : bool, optional Verbosity flag. clean_work_dir : bool, optional If ``True``, clean model output directory before use. electron_distribution : str or distribution object, optional Convenience alias to select an electron distribution. proton_distribution : str or distribution object, optional Convenience alias to select a proton distribution. electron_distribution_log_values : bool, optional Log-value mode for the electron distribution alias. proton_distribution_log_values : bool, optional Log-value mode for the proton distribution alias. geometry : str, optional Emission-region geometry. """ if electron_distribution is not None: emitters_type = 'electrons' emitters_distribution= electron_distribution if electron_distribution_log_values is not None: emitters_distribution_log_values = electron_distribution_log_values if proton_distribution is not None: emitters_type = 'protons' emitters_distribution= proton_distribution if proton_distribution_log_values is not None: emitters_distribution_log_values = proton_distribution_log_values if electron_distribution is not None and emitters_type == 'protons': raise RuntimeError("if you provide an electron_distribution, you can't set emitters_type=",emitters_type, 'use emitters_distribution instead') if proton_distribution is not None and emitters_type == 'electrons': raise RuntimeError("if you provide a proton_distribution, you can't set emitters_type=",emitters_type, 'use emitters_distribution instead') if name is None: _name = 'jet' else: _name = name = clean_var_name(name) super(Jet,self).__init__(cosmo=cosmo, name=_name, emitters_type=emitters_type, emitters_distribution=emitters_distribution, emitters_distribution_log_values=emitters_distribution_log_values, beaming_expr=beaming_expr, jet_workplace=jet_workplace, verbose=verbose, clean_work_dir=clean_work_dir, geometry=geometry) if name is None or name == '': if self.emitters_distribution.emitters_type == 'electrons': name = 'jet_leptonic' elif self.emitters_distribution.emitters_type == 'protons': name = 'jet_hadronic_pp' else: name = 'jet' self.name = clean_var_name(name) if self.emitters_distribution.emitters_type == 'protons': self.add_pp_gamma_component() self.add_pp_neutrino_component() self.add_bremss_ep_component() if self.emitters_distribution.emitters_type == 'electrons': self.electron_distribution=self.emitters_distribution self.parameters.NH_cold_to_rel_e.hidden=False if self.emitters_distribution.emitters_type == 'protons': self.protons_distribution=self.emitters_distribution
[docs] @staticmethod def available_electron_distributions(): """Print available electron-distribution names.""" JetBase.available_emitters_distributions()
[docs] @staticmethod def available_proton_distributions(): """Print available proton-distribution names.""" JetBase.available_emitters_distributions()
[docs] def get_proton_distribution_name(self): """Return the active proton-distribution name.""" return self.get_emitters_distribution_name()
[docs] def get_electron_distribution_name(self): """Return the active electron-distribution name.""" return self.get_emitters_distribution_name()
[docs] def show_proton_distribution(self): """Print proton-distribution configuration.""" self.show_emitters_distribution()
[docs] def show_electron_distribution(self): """Print electron-distribution configuration.""" self.show_emitters_distribution()
[docs] def add_bremss_ep_component(self): """Add electron-proton bremsstrahlung spectral component.""" self._add_spectral_component('Bremss_ep', var_name='Bremss_ep.do_bremss_ep', state_dict=dict((('on', 1), ('off', 0))))
[docs] def add_pp_gamma_component(self): """Add proton-proton gamma-ray spectral component.""" self._add_spectral_component('PP_gamma', var_name='PP_gamma.do_pp_gamma', state_dict=dict((('on', 1), ('off', 0))))
[docs] def add_pp_neutrino_component(self): """Add proton-proton neutrino spectral components.""" self._add_spectral_component('PP_neutrino_tot', var_name='PP_neutrino.do_pp_neutrino', state_dict=dict((('on', 1), ('off', 0)))) self._add_spectral_component('PP_neutrino_mu', var_name='PP_neutrino.do_pp_neutrino', state_dict=dict((('on', 1), ('off', 0)))) self._add_spectral_component('PP_neutrino_e', var_name='PP_neutrino.do_pp_neutrino', state_dict=dict((('on', 1), ('off', 0))))
[docs] def set_N_from_U_emitters(self,U, gmin=None, gmax=None): """Set normalization to match target emitter energy density. Parameters ---------- U : float Target energy density in ``erg cm^-3``. gmin, gmax : float, optional Integration bounds in Lorentz factor used for ``eval_U``. """ ratio = U/self.emitters_distribution.eval_U(gmin=gmin, gmax=gmax) if self._leptonic_equilibrium: L_inj=self.parameters.L_inj.val self.set_par('L_inj', val=L_inj *ratio) self.set_blob() else: self.emitters_distribution._fill() N = self.parameters.N.val self.set_par('N', val=N *ratio) self.set_blob()
[docs] def set_N_from_U_vol_emitters(self, U_vol, gmin=None, gmax=None): """Set normalization to match target integrated emitter energy. Parameters ---------- U_vol : float Target integrated energy in ``erg`` over emitting volume. gmin, gmax : float, optional Integration bounds in Lorentz factor used for ``eval_U``. """ U=U_vol/ self._blob.core.Vol_region self.set_N_from_U_emitters(U, gmin=gmin, gmax=gmax)
[docs] def set_N_from_L_sync(self,L_sync): """Set normalization to match target integrated synchrotron luminosity. Parameters ---------- L_sync : float Target source-frame synchrotron luminosity in ``erg s^-1``. """ if self._leptonic_equilibrium: self.set_blob() self.set_par('L_inj', val=1E40) self.set_blob() delta = self.get_beaming() ratio = L_sync/(BlazarSED.Power_Sync_Electron(self._blob)* delta ** 4)*1E40 self.set_par('L_inj', val=ratio) else: self.set_par('N', val=1.0) self.set_blob() delta = self.get_beaming() ratio = L_sync/(BlazarSED.Power_Sync_Electron(self._blob)* delta ** 4) self.set_par('N', val=ratio)
[docs] def set_N_from_F_sync(self, F_sync): """Set normalization to match target observed integrated synchrotron flux. Parameters ---------- F_sync : float Target observed integrated synchrotron flux in ``erg cm^-2 s^-1``. """ DL = self.get_DL_cm() L = F_sync * DL * DL * 4.0 * np.pi self.set_N_from_L_sync(L)
[docs] def set_N_from_nuLnu(self,nuLnu_src, nu_src): """Set normalization to match target source-frame ``nuLnu`` at ``nu_src``. Parameters ---------- nuLnu_src : float Target source-frame luminosity at ``nu_src`` in ``erg s^-1``. nu_src : float Source-frame frequency in ``Hz``. """ if self._leptonic_equilibrium: self.set_par('L_inj',val=1E40) self.set_blob() delta = self._blob.core.beam_obj nu_blob = nu_src / delta L_out = BlazarSED.Lum_Sync_at_nu(self._blob, nu_blob) * delta ** 4 L_out = nuLnu_src / L_out*1E40 self.set_par('L_inj', val=L_out) else: self.set_par('N',val=1.0) self.set_blob() delta = self._blob.core.beam_obj nu_blob = nu_src / delta L_out = BlazarSED.Lum_Sync_at_nu(self._blob, nu_blob) * delta ** 4 N_out = nuLnu_src / L_out self.set_par('N', val=N_out)
[docs] def set_N_from_nuFnu(self, nuFnu_obs, nu_obs): """Set normalization to match target observed ``nuFnu`` at ``nu_obs``. Parameters ---------- nuFnu_obs : float Target observed differential flux in ``erg cm^-2 s^-1``. nu_obs : float Observed frequency in ``Hz``. """ self.set_blob() DL = self.get_DL_cm() L = nuFnu_obs * DL * DL * 4.0 * np.pi nu_rest = nu_obs * (1 + self.parameters.z_cosm.val) self.set_N_from_nuLnu( L, nu_rest)
[docs] def set_B_eq(self, nuFnu_obs, nu_obs, B_min=1E-9,B_max=1.0,N_pts=20,plot=False): """Estimate equipartition magnetic field by scanning a logarithmic B grid, for a given observed flux of the synchrotron emission (nuFnu_obs) at a given observed frequency (nu_obs) Parameters ---------- nuFnu_obs : float Target observed differential synchrotron flux in ``erg cm^-2 s^-1``. nu_obs : float Observed frequency in ``Hz``. B_min, B_max : float, optional Scan bounds for magnetic field in Gauss. N_pts : int, optional Number of logarithmically spaced trial values. plot : bool, optional If ``True``, show diagnostic curves for ``U_e`` and ``U_B``. Returns ------- tuple ``(B_best, B_grid, U_B, U_e)``. """ b_grid = np.logspace(np.log10(B_min), np.log10(B_max), N_pts) print ('B grid min ',B_min) print ('B grid max ',B_max) print ('grid points',N_pts) U_e = np.zeros(N_pts) U_B = np.zeros(N_pts) N = np.zeros(N_pts) self.set_par('B', b_grid[0]) self.set_blob() for ID, b in enumerate(b_grid): self.set_par('B', b) if self._leptonic_equilibrium: self.set_par('L_inj', 1E40) else: self.set_par('N', 1.0) # print 'B_eq',ID self.set_N_from_nuFnu(nuFnu_obs, nu_obs) if self._leptonic_equilibrium: pass else: N[ID]=self.get_par_by_name('N').val self.set_blob() # U_e[ID] = self._blob.emitters.U_e U_B[ID] = self._blob.Sync.UB # delta=Jet.get_beaming() # print "check L_in=%4.4e L_out=%4.4e"%(L_0,(L_0/delta**4)/BlazarSED.Power_Sync_Electron(Jet._Jet__blob)) ID_min = np.argmin(U_B + U_e) if plot==True: #import pylab as plt fig, ax = plt.subplots() ax.plot(b_grid,U_e,label=r'$u_e$') ax.plot(b_grid,U_B,label=r'$u_B$') ax.plot(b_grid,U_B+U_e) ax.scatter(b_grid[ID_min],U_e[ID_min]+U_B[ID_min]) ax.semilogy() ax.semilogx() ax.set_xlabel(r'$B$ (G)') ax.set_ylabel(r'$u$ erg cm$-^3$') ax.legend plt.show() self.set_par('B', val=b_grid[ID_min]) self.set_par('N', val=N[ID_min]) print ('setting B to ',b_grid[ID_min]) print ('setting N to ',N[ID_min]) return b_grid[ID_min],b_grid,U_B,U_e
[docs] def eval_synch_pol(self,nu_range_obs): """Evaluate synchrotron polarization in the observer frame. Returns ------- tuple of ndarray Polarization degree and corresponding ``nuFnu`` values. """ nuF_nu=self.eval(get_model=True,nu=nu_range_obs) pol_nu=np.zeros(nu_range_obs.size) #TODO: this will be removed when eval_Sync_polarization will follow the same pattern of synch flux nu_range_pol_blob=nu_range_obs/self.get_beaming()*(1+self.parameters.z_cosm.val) for ID,nu in enumerate(nu_range_pol_blob): pol_nu[ID]=BlazarSED.eval_Sync_polarization(self._blob,nu) m=np.logical_or(nuF_nu<=0,np.isnan(pol_nu)) pol_nu[m]=0 nuF_nu[m]=0 return pol_nu,nuF_nu
[docs] def eval_synch_pol_blob(self,nu_range_blob): """Evaluate synchrotron polarization in the blob frame. Parameters ---------- nu_range_blob : array-like or float Blob-frame frequency values in Hz. A scalar is accepted and is internally promoted to a one-dimensional array. Returns ------- pol_nu_blob : ndarray Synchrotron polarization degree evaluated at ``nu_range_blob``. nuLnu_blob : ndarray Synchrotron ``nuLnu`` values in the blob frame at the same frequencies. """ #TODO: this will be removed when eval_Sync_polarization will follow the same pattern of synch flux nu_range_blob = np.atleast_1d(np.asarray(nu_range_blob, dtype=np.float64)) nu_range_obs=nu_range_blob*self.get_beaming()/(1+self.parameters.z_cosm.val) self.eval(nu=nu_range_obs) # Work on a local copy to avoid mutating cached SED arrays. nuLnu_blob=np.asarray(self.spectral_components.Sync.SED.nuLnu_blob, dtype=np.float64).copy() pol_nu_blob=np.zeros(nu_range_blob.size) #TODO: this will be removed when eval_Sync_polarization will follow the same pattern of synch flux for ID,nu in enumerate(nu_range_blob): pol_nu_blob[ID]=BlazarSED.eval_Sync_polarization(self._blob,nu) m=np.logical_or(nuLnu_blob<=0,~np.isfinite(pol_nu_blob)) pol_nu_blob[m]=0 nuLnu_blob[m]=0 return pol_nu_blob,nuLnu_blob
[docs] class GalacticBeamed(Jet): """Beamed galactic-source specialization of :class:`Jet`.""" def __init__(self, distance=3*u.kpc, name=None, emitters_type='electrons', emitters_distribution='plc', emitters_distribution_log_values=False, beaming_expr='delta', jet_workplace=None, verbose=False, clean_work_dir=True, electron_distribution=None, proton_distribution=None, electron_distribution_log_values=None, proton_distribution_log_values=None, geometry='spherical'): """Initialize a galactic beamed model with fixed luminosity distance.""" if name is None: _name = 'unbeamed' else: _name = name = clean_var_name(name) if np.isscalar(distance): _d=distance else: try: _d=distance.to('cm') except: raise RuntimeError('distance must be either an astropy unit object convertible to cm, or a scalar in cm') cosmo=Cosmo(DL_cm=_d,verbose=False) super(GalacticBeamed,self).__init__(cosmo=cosmo, name=_name, emitters_type=emitters_type, emitters_distribution=emitters_distribution, emitters_distribution_log_values=emitters_distribution_log_values, electron_distribution=electron_distribution, proton_distribution=proton_distribution, electron_distribution_log_values=electron_distribution_log_values, proton_distribution_log_values=proton_distribution_log_values, beaming_expr=beaming_expr, jet_workplace=jet_workplace, verbose=verbose, clean_work_dir=clean_work_dir, geometry=geometry) if name is None or name == '': if self.emitters_distribution.emitters_type == 'electrons': name = 'galactic_beamed_leptonic' elif self.emitters_distribution.emitters_type == 'protons': name = 'galactic_beamed_hadronic_pp' else: name = 'galactic_beamed' self.name = clean_var_name(name) self._handle_z(d=_d) def _dummy_z_par_func(self,DL_cm): self.cosmo._DL_cm=DL_cm*u.cm return 0 def _handle_z(self,d=u.kpc*1): self._blob.core.z_cosm=0 self.parameters.z_cosm.val=0 self.parameters.z_cosm.hidden=True self.parameters.z_cosm.frozen=True _dmax=1000*u.kpc.to('cm') self.add_user_par(name='DL_cm',units='cm',val=d.value,val_min=0,val_max=_dmax) p=self.parameters.get_par_by_name('DL_cm') p.par_type='distance'
[docs] class GalacticUnbeamed(GalacticBeamed): """Unbeamed galactic-source specialization of :class:`GalacticBeamed`.""" def __init__(self, distance=3*u.kpc, name=None, emitters_type='electrons', emitters_distribution='plc', emitters_distribution_log_values=False, jet_workplace=None, verbose=False, clean_work_dir=True, electron_distribution=None, proton_distribution=None, electron_distribution_log_values=None, proton_distribution_log_values=None, geometry='spherical'): """Initialize a galactic unbeamed model (beam factor fixed to 1).""" super(GalacticUnbeamed,self).__init__(distance=distance, name=name, emitters_type=emitters_type, emitters_distribution=emitters_distribution, emitters_distribution_log_values=emitters_distribution_log_values, beaming_expr='delta', jet_workplace=jet_workplace, verbose=verbose, clean_work_dir=clean_work_dir, electron_distribution=electron_distribution, proton_distribution=proton_distribution, electron_distribution_log_values=electron_distribution_log_values, proton_distribution_log_values=proton_distribution_log_values, geometry=geometry) if name is None or name == '': if self.emitters_distribution.emitters_type == 'electrons': name = 'galactic_unbeamed_leptonic' elif self.emitters_distribution.emitters_type == 'protons': name = 'galactic_unbeamed_hadronic_pp' else: name = 'galactic_unbeamed' self.name=name self.parameters.beam_obj.val=1 self.parameters.beam_obj.hidden=True self.parameters.R_H.hidden=True self.set_external_field_transf('disk')