Source code for jetset.internal_absorption

"""Internal absorption modeling utilities for jet spectral components."""

import os

# Prefer the portable numba backend for this kernel path to avoid OpenMP runtime
# deprecation chatter (e.g. omp_set_nested) in environments using Intel OMP.
os.environ.setdefault("NUMBA_THREADING_LAYER", "workqueue")

import os

on_rtd = os.environ.get('READTHEDOCS', None) == 'True'

if on_rtd:
    from .mock import jetkernel as BlazarSED
else:
    from .jetkernel.jetkernel import HPLANCK as h
    from .jetkernel.jetkernel import MEC2 as mec2
    from .jetkernel.jetkernel import SIGTH 
    from .jetkernel import jetkernel as BlazarSED
    H_OVER_MEC2 = h / mec2
    SIGMA_PREF = 0.75 * SIGTH * 0.5




from .jet_kernel_tools import get_spectral_c_array_read_only
from .utils import get_nested_attr
from numba import njit, prange
import numpy as np
import warnings



@njit(fastmath=True)
def _sigma_numba(s, pref):
    beta = np.sqrt(1.0 - 1.0 / s)
    term = (3.0 - beta ** 4) * np.log((1.0 + beta) / (1.0 - beta)) - 2.0 * beta * (2.0 - beta ** 2)
    return pref * (1.0 - beta ** 2) * term


@njit(parallel=True, fastmath=True)
def _compute_tau_numba(nu_src,
                       nu_grid,
                       n_grid,
                       mu_grid,
                       R_H_grid,
                       h_over_mec2,
                       sigma_pref):

    n_gamma = nu_src.shape[0]
    n_rh, n_theta, n_soft = nu_grid.shape
    two_pi = 2.0 * np.pi
    tau = np.zeros(n_gamma)

    for gamma_idx in prange(n_gamma):
        eps_gamma = nu_src[gamma_idx] * h_over_mec2
        tau_gamma = 0.0
        prev_rh_integral = 0.0
        for rh_idx in range(n_rh):
            mu_vals = mu_grid[rh_idx]
            mu_integral = 0.0
            prev_mu_integral = 0.0
            for theta_idx in range(n_theta):
                mu_val = mu_vals[theta_idx]
                one_minus_mu = 1.0 - mu_val
                if one_minus_mu < 1e-20:
                    one_minus_mu = 1e-20

                nu_integral = 0.0
                prev_nu_val = nu_grid[rh_idx, theta_idx, 0]
                prev_integrand = 0.0
                for soft_idx in range(n_soft):
                    nu_val = nu_grid[rh_idx, theta_idx, soft_idx]
                    eps_soft = nu_val * h_over_mec2
                    s_val = eps_gamma * eps_soft * one_minus_mu * 0.5

                    integrand = 0.0
                    if s_val >= 1.0:
                        integrand = _sigma_numba(s_val, sigma_pref) * n_grid[rh_idx, theta_idx, soft_idx] * one_minus_mu

                    if soft_idx > 0:
                        dnu = nu_val - prev_nu_val
                        nu_integral += 0.5 * (prev_integrand + integrand) * dnu

                    prev_nu_val = nu_val
                    prev_integrand = integrand

                if theta_idx > 0:
                    dmu = mu_val - mu_vals[theta_idx - 1]
                    mu_integral += 0.5 * (prev_mu_integral + nu_integral) * dmu

                prev_mu_integral = nu_integral

            if rh_idx > 0:
                d_rh = R_H_grid[rh_idx] - R_H_grid[rh_idx - 1]
                tau_gamma += 0.5 * (prev_rh_integral + mu_integral) * d_rh

            prev_rh_integral = mu_integral

        tau[gamma_idx] = two_pi * tau_gamma

    return tau


[docs] class InternalAbsorption(object): """Compute internal gamma-gamma absorption from jet seed photon fields. Notes ----- Evaluates optical depth ``tau(nu)`` using BLR or DT photon distributions, caches previous evaluations for reuse, and provides attenuation factors for integration into jet spectral component calculations. """ def __init__(self, jet, nu_min=None, seed_photons_name='BLR', N_soft=50, N_hard=50, N_R_H=20, N_theta=20, use_R_H_profile_extrapolation=False, ): """Create a new `InternalAbsorption` instance. Parameters ---------- jet : object Jet model instance. nu_min : object, optional Minimum frequency in Hz. seed_photons_name : str, optional Seed-photon field identifier (for example ``BLR`` or ``DT``). N_soft : int, optional Number of soft-photon energy samples. N_hard : int, optional Number of hard-photon energy samples. N_R_H : int, optional Number of distance samples along ``R_H``. N_theta : int, optional Number of angular samples. use_R_H_profile_extrapolation : bool, optional If ``True``, enable r h profile extrapolation. """ if seed_photons_name in ['BLR','DT']: self._seed_photons_name=seed_photons_name else: raise RuntimeError('seed_photons_name %s not valid'%seed_photons_name) self._N_soft=N_soft self._N_hard=N_hard self._N_R_H=N_R_H self._N_theta=N_theta self._nu_min=nu_min jet.skip_internal_absorption_serial=True self._jet=jet.clone() jet.skip_internal_absorption_serial=False self._jet_orig=jet self._old_tau=None self._old_nu_tau=None self._parameters_old=None self._use_R_H_profile_extrapolation=use_R_H_profile_extrapolation #self._DT_pars=['R_H','tau_DT','R_DT','T_DT'] def _update_parameters_old(self): self._parameters_old={} for p_orig in self._jet_orig.parameters.par_array: if not p_orig.frozen and not p_orig._is_dependent: self._parameters_old[p_orig.name]=p_orig.val def _check_eval_needed(self,ptype): changed=False if self._parameters_old is None: return True if self._old_tau is None: return True pars_new=self._jet_orig.parameters.get_pars_by_type(ptype) pars_new.extend(self._jet_orig.parameters.get_pars_by_type('Disk')) pars_new.extend([self._jet_orig.parameters.get_par_by_name('R_H')]) for p_new in pars_new: if p_new.name in self._parameters_old.keys(): if p_new.val != self._parameters_old[p_new.name]: changed=True else: changed=True return changed def _get_R_seed(self): if self._seed_photons_name not in ["BLR","DT"]: raise RuntimeError('seed_photons_name %s not valid'%self._seed_photons_name) if self._seed_photons_name == "BLR": R_seed=self._jet.parameters.R_BLR_out.val elif self._seed_photons_name == "DT": R_seed=self._jet.parameters.R_DT.val return R_seed def _get_R_seed_field(self,R_seed,N_soft,peak): nu_soft_0,n_soft_0=self.get_n(R_H=R_seed/1000, seed_photons_name=self._seed_photons_name, N_soft=N_soft, peak=peak) return nu_soft_0,n_soft_0
[docs] def eval_tau_photons(self, nu_src, R_H, skip_check=True, peak=False, use_R_H_profile_extrapolation=False): """Evaluate tau photons. Parameters ---------- nu_src : object Source-frame frequency array in Hz. R_H : object Distance from black hole in cm. skip_check : bool, optional If ``True``, skip check. peak : bool, optional If ``True``, use peak-optimized seed-photon sampling. use_R_H_profile_extrapolation : bool, optional If ``True``, enable r h profile extrapolation. Returns ------- object Computed value. """ for p_orig in self._jet_orig.parameters.par_array: if not p_orig.frozen and not p_orig._is_dependent: p=self._jet.get_par_by_name(p_orig.name) p.val=p_orig.val R_H_range=np.logspace(0,3,self._N_R_H)*R_H if peak is True: N_soft=1 else: N_soft=self._N_soft R_seed=self._get_R_seed() nu_soft_0,n_soft_0=self._get_R_seed_field(R_seed,N_soft,peak) if self._nu_min is None: nu_min=1E40/(nu_soft_0.max()) else: nu_min = self._nu_min nu_tau=np.logspace(np.log10(nu_min),np.log10(nu_src.max()),self._N_hard) if skip_check: tau_changed=True else: tau_changed=self._check_eval_needed(ptype=self._seed_photons_name) self._update_parameters_old() if self._old_tau is not None: if not tau_changed and np.array_equal(self._old_nu_tau, nu_tau): return self._old_tau,nu_tau shape=(self._N_R_H,self._N_theta,N_soft) nu_soft=np.zeros(shape) n_soft=np.zeros(shape) mu_range=np.zeros(shape) for ID_RH,R_H in enumerate(R_H_range): self._jet.set_par('R_H',val=R_H) if use_R_H_profile_extrapolation: if R_H<=R_seed: R_x=1 else: R_x=R_seed/R_H n_soft[ID_RH]=n_soft_0*(R_x)**2 nu_soft[ID_RH]=nu_soft_0 else: nu_soft[ID_RH],n_soft[ID_RH]=self.get_n(R_H=R_H, seed_photons_name=self._seed_photons_name, N_soft=N_soft, peak=peak) if R_H<R_seed: mu_max=1.0 mu_min=-1 else: mu_max=1.0 mu_min = np.sqrt(1.0 - (R_seed / R_H)**2) mu_range[ID_RH]=(np.ones((self._N_theta,N_soft)).T*np.linspace(mu_min,mu_max,self._N_theta).T).T nu_tau = np.atleast_1d(nu_tau) #if nu_min is not None: nu_tau = nu_tau[nu_tau >= nu_min] if nu_tau.size == 0: return np.zeros(0, dtype=np.float64),nu_tau nu_soft_grid = np.ascontiguousarray(nu_soft, dtype=np.float64) n_soft_grid = np.ascontiguousarray(n_soft, dtype=np.float64) mu_grid = np.ascontiguousarray(mu_range[:, :, 0], dtype=np.float64) R_H_grid = np.ascontiguousarray(R_H_range, dtype=np.float64) nu_tau_grid = np.ascontiguousarray(nu_tau, dtype=np.float64) try: tau = _compute_tau_numba( nu_tau_grid, nu_soft_grid, n_soft_grid, mu_grid, R_H_grid, H_OVER_MEC2, SIGMA_PREF, ) except Exception as exc: warnings.warn( f"Falling back to NumPy tau computation because the numba implementation failed: {exc}", RuntimeWarning, stacklevel=2, ) one_minus_mu = np.clip(1.0 - mu_range, 1e-20, None) eps_soft = nu_soft_grid * H_OVER_MEC2 eps_gamma = (nu_tau_grid * H_OVER_MEC2)[:, None, None, None] s = eps_gamma * eps_soft[None, :, :, :] * one_minus_mu[None, :, :, :] / 2.0 sigma_vals = self.sigma(s) integrand = sigma_vals * n_soft_grid[None, :, :, :] * one_minus_mu[None, :, :, :] int_over_nu = np.trapezoid(integrand, nu_soft_grid[None, :, :, :], axis=-1) int_over_mu = np.trapezoid(int_over_nu, mu_range[None, :, :, 0], axis=-1) tau = 2.0 * np.pi * np.trapezoid(int_over_mu, R_H_grid, axis=-1) return tau,nu_tau
[docs] def sigma(self, s): """Sigma. Parameters ---------- s : object Dimensionless interaction invariant. Returns ------- object Computed value. """ out = np.zeros_like(s) mask = s >= 1.0 if not np.any(mask): return out sm = s[mask] beta = np.sqrt(1.0 - 1.0/sm) pref = 0.75 * SIGTH * 0.5 # 3/16 * σ_T = 0.1875 σ_T term = (3 - beta**4) * np.log((1 + beta)/(1 - beta)) - 2*beta*(2 - beta**2) out[mask] = pref * (1 - beta**2) * term return out
[docs] def get_n(self, seed_photons_name, N_soft, R_H, peak=False, rescale=True): """Return n. Parameters ---------- seed_photons_name : object Seed-photon field identifier (for example ``BLR`` or ``DT``). N_soft : object Number of soft-photon energy samples. R_H : object Distance from black hole in cm. peak : bool, optional If ``True``, use peak-optimized seed-photon sampling. rescale : bool, optional If ``True``, apply normalization rescaling. Returns ------- object Requested value. """ self._jet.set_par('R_H',val=R_H) BlazarSED.Build_I_nu_Disk(self._jet._blob) if seed_photons_name == "BLR": n_name='BLR.spec.n_nu_DRF' nu_name='BLR.spec.nu_DRF' BlazarSED.Build_I_nu_BLR(self._jet._blob) nu_start=self._jet._blob.BLR.spec.nu_min_DRF nu_stop=self._jet._blob.BLR.spec.nu_max_DRF elif seed_photons_name== "DT": n_name='DT.spec.n_nu_DRF' nu_name='DT.spec.nu_DRF' BlazarSED.Build_I_nu_DT(self._jet._blob) nu_start=self._jet._blob.DT.spec.nu_min_DRF nu_stop=self._jet._blob.DT.spec.nu_max_DRF else: raise RuntimeError('seed_photons_name %s not valid'%seed_photons_name) n_ptr = get_nested_attr(self._jet._blob, n_name) nu_ptr = get_nested_attr(self._jet._blob, nu_name) size=self._jet._blob.core.nu_grid_size x,y=get_spectral_c_array_read_only(nu_ptr,n_ptr,size) msk=np.logical_and(x>=nu_start,x<=nu_stop) x=x[msk] y=y[msk] if peak is True: idx=np.argmax(y) scale_factor=np.trapezoid(y,x) x=np.atleast_1d(x[idx]) y=np.atleast_1d(y[idx]) if rescale is True: scale_factor=y*x/scale_factor y=y/scale_factor else: x=x[y>y.max()/100] y=y[y>y.max()/100] x_int=np.logspace(np.log10(x[0]),np.log10(x[-1]),N_soft) y_int = np.interp(np.log10(x_int),np.log10(x), np.log10(y),left=1E-200,right=1E-200) y_int = np.power(10., y_int) x=x_int y=y_int return x,y
[docs] def eval(self, get_tau=False,skip_check=True,lin_nu=None,peak=False): """Evaluate model output. Parameters ---------- get_tau : bool, optional If ``True``, return optical depth instead of attenuation. skip_check : bool, optional If ``True``, skip check. lin_nu : object, optional Linear-frequency array in Hz. peak : bool, optional If ``True``, use peak-optimized seed-photon sampling. Returns ------- object Computed value. """ if lin_nu is None: nu_src=self._jet_orig.spectral_components.Sum.SED.nu_src.value else: nu_src=lin_nu nu_src=np.atleast_1d(nu_src) tau,nu_tau=self.eval_tau_photons(nu_src, R_H=self._jet_orig.parameters.R_H.val, skip_check=skip_check, peak=peak, use_R_H_profile_extrapolation=self._use_R_H_profile_extrapolation) self._old_tau=tau self._old_nu_tau=nu_tau EPS = 1e-300 nu_src_pos = np.maximum(nu_src, EPS) nu_tau_pos = np.maximum(nu_tau, EPS) tau_pos = np.maximum(tau, EPS) tau_interp = 10**np.interp( np.log10(nu_src_pos), np.log10(nu_tau_pos), np.log10(tau_pos), left=np.log10(EPS), right=None ) if get_tau: return tau_interp,nu_src_pos