__author__ = "Andrea Tramacere"
import numpy as np
import os
from astropy.table import Table
from numpy.core._multiarray_umath import zeros, log10
from scipy import interpolate
on_rtd = os.environ.get('READTHEDOCS', None) == 'True'
if on_rtd == True:
try:
from .jetkernel import jetkernel as BlazarSED
except ImportError:
from .mock import jetkernel as BlazarSED
else:
from .jetkernel import jetkernel as BlazarSED
from . import spectral_shapes
from .jetkernel_models_dic import nuFnu_obs_dict, n_seed_dic
from .plot_sedfit import PlotSpecComp,PlotSeedPhotons
from .utils import check_frame, unexpected_behaviour
from .jet_kernel_tools import get_spectral_c_array
__all__=['JetSeedPhotons','JetSpecComponent','SpecCompList']
[docs]
class JetSeedPhotons(object):
"""
"""
def __init__(self,name,blob_object,var_name=None):
self.name = name
self._blob_object = blob_object
self._n_name, self._nu_name = n_seed_dic[self.name]
self.n_ptr = getattr(blob_object, self._n_name)
self.nu_ptr = getattr(blob_object, self._nu_name)
#self.SED = spectral_shapes.SED(name=self.name)
if var_name is not None:
self._var_name=var_name
self.fill(emiss_lim=self._blob_object.emiss_lim)
[docs]
def fill(self,log_log=False,emiss_lim=0):
self.nu,self.n=self.get_spectral_points(log_log=log_log,emiss_lim=emiss_lim)
[docs]
def get_spectral_points(self,log_log=False,emiss_lim=0):
x,y=get_spectral_c_array(self.nu_ptr,self.n_ptr ,self._blob_object.nu_grid_size,self._blob_object)
msk_nan=np.isnan(x)
msk_nan+=np.isnan(y)
#print('emiss lim',self.get_emiss_lim())
x[msk_nan]=0.
y[msk_nan]=emiss_lim
msk=y<emiss_lim
y[msk]=emiss_lim
if log_log==True:
msk = y <= 0.
y[msk] = emiss_lim
#x=x[msk]
# y=y[msk]
x=log10(x)
y=log10(y)
return x,y
#except:
# raise RuntimeError ('model evaluation failed in get_spectral_points')
[docs]
def plot(self, y_min=None,y_max=None):
self.fill(emiss_lim=self._blob_object.emiss_lim)
p=PlotSeedPhotons()
p.plot(nu=self.nu,nuFnu=self.n,y_min=y_min,y_max=y_max)
return p
[docs]
class JetSpecComponent(object):
"""
"""
def __repr__(self):
return str(self.show())
#def __str__(self):
# return str(self.show())
def __init__(self,jet_obj,name,blob_object,var_name=None,state_dict=None,state=None):
self.name=name
self.jet_obj=jet_obj
self._blob_object=blob_object
self._nuFnu_name, self._nu_name=nuFnu_obs_dict[self.name]
self.nuFnu_ptr=getattr(blob_object,self._nuFnu_name)
self.nu_ptr=getattr(blob_object,self._nu_name)
self.SED=spectral_shapes.SED(name=self.name,beaming=jet_obj.get_beaming())
self.seed_field=None
self._hidden=False
# self._nu_start_src_name, self._nu_stop_src_name = nu_src_start_stop_dict[self.name]
#
# self.nu_ptr_start = getattr(blob_object, self._nu_name)
# self.nu_ptr_stop = getattr(blob_object, self._nu_name)
#
# self._nu_start_src = 'auto'
# self._nu_stop_src = 'auto'
# self._nu_start_obs = 'auto'
# self._nu_stop_obs = 'auto'
if name in n_seed_dic.keys():
self.seed_field=JetSeedPhotons(name,blob_object)
if var_name is not None:
self._var_name=var_name
if state_dict is None:
self._state_dict = dict()
self._state_dict['on'] = 1
self._state_dict['off'] = 0
else:
self._state_dict=state_dict
self.state='on'
else:
self._state_dict = {}
self._var_name=None
self._state='on'
if state is not None and self._state_dict != {}:
self.state=state
# @property
# def nu_boundaries(self,frame='obs'):
# check_frame(frame)
# if frame == 'src':
# return self._nu_start_src, self._nu_stop_src
# else:
# return self._nu_start_obs, self._nu_stop_obs
#
# @nu_boundaries.setter
# def nu_boundaries(self,nu_start=None,nu_stop=None, frame='obs'):
# check_frame(frame)
# if frame == 'obs':
# self._nu_start_obs = nu
# self._nu_start_src = convert_nu_to_src(nu,self.jet_obj.get_par_by_type('redshift').val,'obs')
# else:
# self._nu_start_src = nu
# self._nu_start_obs = convert_nu_to_src(nu, self.jet_obj.get_par_by_type('redshift').val, 'obs')
@property
def hidden(self):
return self._hidden
@hidden.setter
def hidden(self,val):
if val not in [True,False]:
raise RuntimeError('val must be False or True')
else:
self._hidden=val
[docs]
def get_emiss_lim(self,seed=False):
return self._blob_object.emiss_lim
[docs]
def fill_SED(self,log_log=False,lin_nu=None,skip_zeros=False):
x,y=self.get_SED_points( log_log=log_log,lin_nu=lin_nu,skip_zeros=skip_zeros)
self.SED.beaming=self.jet_obj.get_beaming()
self.SED.fill(nu=x,nuFnu=y,log_log=log_log)
self.SED.fill_nuLnu(z=self.jet_obj.get_par_by_type('redshift').val,dl=self.jet_obj.get_DL_cm())
if self.seed_field is not None:
self.seed_field.fill(log_log=log_log)
[docs]
def get_SED_points(self, log_log=False, lin_nu=None, interp='linear', skip_zeros=False):
x,y= get_spectral_c_array(self.nu_ptr, self.nuFnu_ptr, self._blob_object.nu_grid_size,self._blob_object)
msk_nan = np.isnan(x)
msk_nan += np.isnan(y)
x[msk_nan] = 0.
y[msk_nan] = self.get_emiss_lim()
msk = y < self.get_emiss_lim()
y[msk] = self.get_emiss_lim()
msk_zeros = y > self.get_emiss_lim()
if lin_nu is not None:
f_interp = interpolate.interp1d(log10(x), log10(y), bounds_error=False, kind=interp)
y = np.power(10., f_interp(log10(lin_nu)))
#y=np.interp(log10(lin_nu), log10(x), log10(y),np.nan,np.nan)
x=lin_nu
msk_nan = np.isnan(y)
y[msk_nan] = 0.
msk_zeros = y > self.get_emiss_lim()
y[~msk_zeros] = 0.
if log_log == True:
msk = y <= 0.
y[msk] = -1.0E10
msk_zeros = y > self.get_emiss_lim()
x = log10(x)
y = log10(y)
if skip_zeros is True:
_x = x[msk_zeros]
_y = y[msk_zeros]
else:
_x = x
_y = y
return _x, _y
def _update_jetkernel_spectral_array(self,y):
size = self._blob_object.nu_grid_size
if y.size != size:
raise RuntimeError('the size of the input array is different from the size of the target array in jetkernel')
for i in range(size):
BlazarSED.set_spectral_array(self.nuFnu_ptr, self._blob_object, i, y[i])
[docs]
def show(self):
print('name :',self.name)
print('var name :',self._var_name)
print('state :',self._state)
#print('nu_start (src frame):', self._nu_start_src)
#print('nu_stop (src frame):', self._nu_stop_src)
if self._state_dict is not None:
print('allowed states :',[k for k in self._state_dict.keys()])
@property
def state(self,):
return self._state
@state.setter
def state(self, val):
if self._state_dict!={}:
if val not in self._state_dict.keys():
raise RuntimeError('val', val, 'not in allowed', self._state_dict.keys())
self._state = val
if self._var_name is not None:
setattr(self._blob_object, self._var_name, self._state_dict[val])
else:
raise Warning('the state of the spectral component',self.name,' can not be changed')
[docs]
def get_var_state(self,):
if self._var_name is not None:
return getattr(self._blob_object,self._var_name)
else:
return None
[docs]
def plot(self, y_min=None,y_max=None):
p=PlotSpecComp()
p.plot(nu=self.SED.nu.value,nuFnu=self.SED.nuFnu.value,y_min=y_min,y_max=y_max)
return p
[docs]
class SpecCompList(object):
def __init__(self,sc_list):
self._sc_list=sc_list
self._table=None
[docs]
def show(self):
for sc in self._sc_list:
sc.show()
def __repr__(self):
return str(self.show())
[docs]
def build_table(self, restframe='obs'):
_names = ['nu']
_cols=[]
check_frame(restframe)
if restframe=='obs':
_cols.append(self._sc_list[0].SED.nu)
elif restframe=='src':
_cols.append(self._sc_list[0].SED.nu_src)
else:
unexpected_behaviour()
for ID,sc in enumerate(self._sc_list):
if sc.hidden is False:
_names.append(sc.name)
if restframe == 'obs':
_cols.append(sc.SED.nuFnu)
else:
_cols.append(sc.SED.nuLnu_src)
_meta=dict(model_name=sc.jet_obj.name)
_meta['redshift']=sc.jet_obj.get_par_by_type('redshift').val
_meta['restframe']= restframe
self._table = Table(_cols, names=_names,meta=_meta)
[docs]
def get_spectral_component_by_name(self,name,verbose=True):
for i in range(len(self._sc_list)):
if self._sc_list[i].name==name:
return self._sc_list[i]
else:
if verbose==True:
print ("no spectral components with name %s found"%name)
@property
def table(self):
return self._table