Source code for jetset.jet_spectral_components


__author__ = "Andrea Tramacere"


import numpy as np
import os
from astropy.table import Table
#TODO: double check the log10 import public vs private numpy API
#from numpy.core._multiarray_umath impo
from numpy import log10
from scipy import interpolate

on_rtd = os.environ.get('READTHEDOCS', None) == 'True'

if on_rtd:
    from .mock import jetkernel as BlazarSED
else:
    from .jetkernel import jetkernel as BlazarSED

from . import spectral_shapes
from .jetkernel_models_dic import nuFnu_obs_dict, n_seed_dic
from .plot_sedfit import PlotSpecComp,PlotSeedPhotons
from .utils import check_frame, unexpected_behaviour, get_nested_attr, set_nested_attr
from .jet_kernel_tools import get_spectral_c_array_read_only

__all__=['JetSeedPhotons','JetSpecComponent','SpecCompList']

[docs] class JetSeedPhotons(object): """JetSeedPhotons class.""" def __init__(self,name,blob_object,var_name=None): """Create a new `JetSeedPhotons` instance. Parameters ---------- name : object Name identifier. blob_object : object Parameter controlling blob object. var_name : object, optional Parameter controlling var name. """ self.name = name self._blob_object = blob_object self._n_name, self._nu_name = n_seed_dic[self.name] self.n_ptr = get_nested_attr(blob_object, self._n_name) self.nu_ptr = get_nested_attr(blob_object, self._nu_name) #self.SED = spectral_shapes.SED(name=self.name) if var_name is not None: self._var_name=var_name self.fill(emiss_lim=self._blob_object.core.emiss_lim)
[docs] def fill(self,log_log=False,emiss_lim=0): """Fill. Parameters ---------- log_log : bool, optional If ``True``, operate in log10 space. emiss_lim : int, optional Parameter controlling emiss lim. """ self.nu,self.n=self.get_spectral_points(log_log=log_log,emiss_lim=emiss_lim)
[docs] def get_spectral_points(self,log_log=False,emiss_lim=0): """Return spectral points. Parameters ---------- log_log : bool, optional If ``True``, operate in log10 space. emiss_lim : int, optional Parameter controlling emiss lim. Returns ------- object Requested value. """ x,y=get_spectral_c_array_read_only(self.nu_ptr,self.n_ptr,self._blob_object.core.nu_grid_size) msk_nan=np.isnan(x) msk_nan+=np.isnan(y) #print('emiss lim',self.get_emiss_lim()) x[msk_nan]=0. y[msk_nan]=emiss_lim msk=y<emiss_lim y[msk]=emiss_lim if log_log==True: msk = y <= 0. y[msk] = emiss_lim #x=x[msk] # y=y[msk] x=log10(x) y=log10(y) return x,y
#except: # raise RuntimeError ('model evaluation failed in get_spectral_points')
[docs] def plot(self, y_min=None,y_max=None): """Plot. Parameters ---------- y_min : object, optional Minimum value for y. y_max : object, optional Maximum value for y. Returns ------- object Computed result. """ self.fill(emiss_lim=self._blob_object.core.emiss_lim) p=PlotSeedPhotons() p.plot(nu=self.nu,nuFnu=self.n,y_min=y_min,y_max=y_max) return p
[docs] class JetSpecComponent(object): """JetSpecComponent class.""" def __repr__(self): return str(self.show()) #def __str__(self): # return str(self.show()) def __init__(self,jet_obj,name,blob_object,var_name=None,state_dict=None,state=None,tau=None): """Create a new `JetSpecComponent` instance. Parameters ---------- jet_obj : object Parameter controlling jet obj. name : object Name identifier. blob_object : object Parameter controlling blob object. var_name : object, optional Parameter controlling var name. state_dict : object, optional Mapping/dictionary for state dict. state : object, optional Parameter controlling state. tau : object, optional Parameter controlling tau. """ self.name=name self.jet_obj=jet_obj self._blob_object=blob_object self._nuFnu_name, self._nu_name=nuFnu_obs_dict[self.name] #print("==> ", self._nuFnu_name, blob_object,self._nu_name ) self.nuFnu_ptr=get_nested_attr(blob_object, self._nuFnu_name) self.nu_ptr=get_nested_attr(blob_object, self._nu_name) #print("==> ", self.nu_ptr, self.nuFnu_ptr) self.SED=spectral_shapes.SED(name=self.name,beaming=jet_obj.get_beaming()) self.seed_field=None self._hidden=False self._tau=tau # self._nu_start_src_name, self._nu_stop_src_name = nu_src_start_stop_dict[self.name] # # self.nu_ptr_start = get_nested_attr(blob_object, self._nu_name) # self.nu_ptr_stop = get_nested_attr(blob_object, self._nu_name) # # self._nu_start_src = 'auto' # self._nu_stop_src = 'auto' # self._nu_start_obs = 'auto' # self._nu_stop_obs = 'auto' if name in n_seed_dic.keys(): self.seed_field=JetSeedPhotons(name,blob_object) if var_name is not None: self._var_name=var_name if state_dict is None: self._state_dict = dict() self._state_dict['on'] = 1 self._state_dict['off'] = 0 else: self._state_dict=state_dict self.state='on' else: self._state_dict = {} self._var_name=None self._state='on' if state is not None and self._state_dict != {}: self.state=state # @property # def nu_boundaries(self,frame='obs'): # check_frame(frame) # if frame == 'src': # return self._nu_start_src, self._nu_stop_src # else: # return self._nu_start_obs, self._nu_stop_obs # # @nu_boundaries.setter # def nu_boundaries(self,nu_start=None,nu_stop=None, frame='obs'): # check_frame(frame) # if frame == 'obs': # self._nu_start_obs = nu # self._nu_start_src = convert_nu_to_src(nu,self.jet_obj.get_par_by_type('redshift').val,'obs') # else: # self._nu_start_src = nu # self._nu_start_obs = convert_nu_to_src(nu, self.jet_obj.get_par_by_type('redshift').val, 'obs') @property def hidden(self): """Hidden. Returns ------- object Requested value. """ return self._hidden @hidden.setter def hidden(self,val): """Hidden. Parameters ---------- val : object Value to assign. """ if val not in [True,False]: raise RuntimeError('val must be False or True') else: self._hidden=val
[docs] def get_emiss_lim(self,seed=False): """Return emiss lim. Parameters ---------- seed : bool, optional Parameter controlling seed. Returns ------- object Requested value. """ return self._blob_object.core.emiss_lim
[docs] def fill_SED(self,log_log=False,lin_nu=None,skip_zeros=False): """Fill sed. Parameters ---------- log_log : bool, optional If ``True``, operate in log10 space. lin_nu : object, optional Frequency/energy control value for lin nu. skip_zeros : bool, optional If ``True``, skip zeros. """ x,y=self.get_SED_points( log_log=log_log,lin_nu=lin_nu,skip_zeros=skip_zeros) if self._tau is not None: y=y*np.exp(-self._tau) self.SED.beaming=self.jet_obj.get_beaming() self.SED.fill(nu=x,nuFnu=y,log_log=log_log) self.SED.fill_nuLnu(z=self.jet_obj.get_par_by_type('redshift').val,dl=self.jet_obj.get_DL_cm()) if self.seed_field is not None: self.seed_field.fill(log_log=log_log)
[docs] def get_SED_points(self, log_log=False, lin_nu=None, interp='linear', skip_zeros=False): """Return sed points. Parameters ---------- log_log : bool, optional If ``True``, operate in log10 space. lin_nu : object, optional Frequency/energy control value for lin nu. interp : str, optional Parameter controlling interp. skip_zeros : bool, optional If ``True``, skip zeros. Returns ------- object Requested value. """ x,y= get_spectral_c_array_read_only(self.nu_ptr, self.nuFnu_ptr, self._blob_object.core.nu_grid_size) msk_nan = np.isnan(x) msk_nan += np.isnan(y) x[msk_nan] = 0. y[msk_nan] = self.get_emiss_lim() msk = y < self.get_emiss_lim() y[msk] = self.get_emiss_lim() msk_zeros = y > self.get_emiss_lim() if lin_nu is not None: f_interp = interpolate.interp1d(log10(x), log10(y), bounds_error=False, kind=interp) y = np.power(10., f_interp(log10(lin_nu))) #y=np.interp(log10(lin_nu), log10(x), log10(y),np.nan,np.nan) x=lin_nu msk_nan = np.isnan(y) y[msk_nan] = 0. msk_zeros = y > self.get_emiss_lim() y[~msk_zeros] = 0. if log_log == True: msk = y <= 0. y[msk] = -1.0E10 msk_zeros = y > self.get_emiss_lim() x = log10(x) y = log10(y) if skip_zeros is True: _x = x[msk_zeros] _y = y[msk_zeros] else: _x = x _y = y return _x, _y
def _update_jetkernel_spectral_array(self,y): size = self._blob_object.core.nu_grid_size if y.size != size: raise RuntimeError('the size of the input array is different from the size of the target array in jetkernel') for i in range(size): BlazarSED.set_spectral_array(self.nuFnu_ptr, self._blob_object, i, y[i])
[docs] def show(self): """Show.""" print('name :',self.name) print('var name :',self._var_name) print('state :',self._state) #print('nu_start (src frame):', self._nu_start_src) #print('nu_stop (src frame):', self._nu_stop_src) if self._state_dict is not None: print('allowed states :',[k for k in self._state_dict.keys()])
@property def state(self,): """State. Returns ------- object Requested value. """ return self._state @state.setter def state(self, val): """State. Parameters ---------- val : object Value to assign. """ if self._state_dict!={}: if val not in self._state_dict.keys(): raise RuntimeError('val', val, 'not in allowed', self._state_dict.keys()) self._state = val if self._var_name is not None: #print("==> setting state",self._blob_object,self._var_name) set_nested_attr(self._blob_object, self._var_name, self._state_dict[val]) #setattr(self._blob_object, self._var_name, self._state_dict[val]) else: raise Warning('the state of the spectral component',self.name,' can not be changed')
[docs] def get_var_state(self,): """Return var state. Returns ------- object Requested value. """ if self._var_name is not None: return get_nested_attr(self._blob_object, self._var_name) else: return None
[docs] def plot(self, y_min=None,y_max=None): """Plot. Parameters ---------- y_min : object, optional Minimum value for y. y_max : object, optional Maximum value for y. Returns ------- object Computed result. """ p=PlotSpecComp() p.plot(nu=self.SED.nu.value,nuFnu=self.SED.nuFnu.value,y_min=y_min,y_max=y_max) return p
[docs] class SpecCompList(object): """Container for spectral-component objects and tabular export. Notes ----- Provides convenience methods to display components, look them up by name, and build an Astropy table from visible component SEDs. """ def __init__(self,sc_list): """Create a new `SpecCompList` instance. Parameters ---------- sc_list : object List of sc. """ self._sc_list=sc_list self._table=None
[docs] def show(self): """Show.""" for sc in self._sc_list: sc.show()
def __repr__(self): return str(self.show())
[docs] def build_table(self, restframe='obs'): """Build table. Parameters ---------- restframe : str, optional Parameter controlling restframe. """ _names = ['nu'] _cols=[] check_frame(restframe) if restframe=='obs': _cols.append(self._sc_list[0].SED.nu) elif restframe=='src': _cols.append(self._sc_list[0].SED.nu_src) else: unexpected_behaviour() for ID,sc in enumerate(self._sc_list): if sc.hidden is False: _names.append(sc.name) if restframe == 'obs': _cols.append(sc.SED.nuFnu) else: _cols.append(sc.SED.nuLnu_src) _meta=dict(model_name=sc.jet_obj.name) _meta['redshift']=sc.jet_obj.get_par_by_type('redshift').val _meta['restframe']= restframe self._table = Table(_cols, names=_names,meta=_meta)
[docs] def get_spectral_component_by_name(self,name,verbose=True): """Return spectral component by name. Parameters ---------- name : object Name identifier. verbose : bool, optional Parameter controlling verbose. Returns ------- object Requested value. """ for i in range(len(self._sc_list)): if self._sc_list[i].name==name: return self._sc_list[i] if verbose==True: print ("no spectral components with name %s found"%name)
@property def table(self): """Table. Returns ------- object Requested value. """ return self._table