Source code for xdust.halos.halo

import numpy as np
from scipy.interpolate import interp1d, InterpolatedUnivariateSpline
from scipy.integrate import trapezoid as trapz
from astropy.io import fits
import astropy.units as u

from .. import helpers

__all__ = ['Halo']

ALLOWED_FTYPE = ['abs','ext']
ALLOWED_FUNIT = ['cgs','phot','count','none']

[docs] class Halo(object): """ An X-ray scattering halo. Sliceable object: can input a range of energy/wavelength values and it will return the halo values for the range of interest. Can also summon values according using integers within range(len(lam)) Attributes ---------- description : str Text description of this scattering halo. lam : astropy.units.Quantity Wavelength or energy grid; plain values default to keV. theta : astropy.units.Quantity Scattering angle grid; plain values default to arcsec. norm_int : astropy.units.Quantity array [arcsec^-2] Normalized scattering halo intensity as a function of energy (NE x NTH) taux : numpy.ndarray (NE) Integrated X-ray scattering opacity as a function of energy fabs : numpy.ndarray or astropy.units.Quantity (NE) Bin-integrated absorbed flux (photon/cm^2/s -or- erg/cm^2/s), used for calculating total halo intensity intensity : astropy.units.Quantity (NTH) [arcsec^-2 x fabs.unit] Energy-integrated intensity calculated for the scattering halo [fabs x norm_int] Parameters ---------- lam : astropy.units.Quantity or numpy.ndarray Wavelength or energy grid; plain values default to keV. theta : astropy.units.Quantity or numpy.ndarray Scattering angle grid; plain values default to arcsec. from_file : str If not ``None``, the filename of a FITS file containing the halo parameters and normalized intensity. The file should be in the format produced by the ``write`` method of this class """ def __init__(self, lam=1.0, theta=1.0, from_file=None): self.description = None if isinstance(lam, u.Quantity): self.lam = lam else: self.lam = lam * u.keV # length NE if isinstance(theta, u.Quantity): self.theta = theta else: self.theta = theta * u.arcsec # length NTH, arcsec self.norm_int = None # NE x NTH, arcsec^-2 self.taux = None # NE, unitless self.fabs = None # NE, bin integrated flux [e.g. phot/cm^2/s, NOT phot/cm^2/s/keV] self.intensity = None # NTH, flux x arcsec^-2 if from_file is not None: self._read_from_file(from_file)
[docs] def calculate_intensity(self, flux, ftype='abs'): """ Calculate the scattering halo intensity from a flux spectrum. Parameters ---------- flux : numpy.ndarray or astropy.units.Quantity Bin-integrated absorbed flux [photon/cm^2/s or erg/cm^2/s]. ftype : str ``'abs'`` if the input is absorbed flux; ``'ext'`` if it is point-source (extinction-corrected) flux. """ assert self.norm_int is not None assert ftype in ALLOWED_FTYPE if ftype == 'abs': fabs = flux if ftype == 'ext': fabs = flux * np.exp(self.taux) # Fps = Fabs exp(-tau_sca) self.fabs = fabs NE, NTH = np.shape(self.norm_int) fa_grid = np.repeat(fabs.reshape(NE,1), NTH, axis=1) self.intensity = np.sum(self.norm_int * fa_grid, axis=0)
@property def fext(self): """ The point-source flux component, :math:`F_{ps} = F_{abs} \\exp(-\\tau_{sca})` Returns ------- numpy.ndarray or astropy.units.Quantity (NE) [fabs.unit] """ assert self.fabs is not None return self.fabs * np.exp(-self.taux) @property def fhalo(self): """ The theoretical bin-integrated flux spectrum for the total scattering halo flux. This is computed using the formula :math:`F_h = F_{abs} (1 - \\exp(-\\tau_{sca}))` Returns ------- numpy.ndarray or astropy.units.Quantity (NE) [fabs.unit] """ assert self.fabs is not None return self.fabs * (1.0 - np.exp(-self.taux)) @property def percent_fabs(self): """ Calculates the fraction of absorbed flux that goes into the scattring halo, effectively :math:`(1 - \\exp(-\\tau))` Returns ------- numpy.ndarray or astropy.units.Quantity (NE) [unitless] """ assert self.fabs is not None return np.sum(self.fhalo) / np.sum(self.fabs) @property def percent_fext(self): """ Calculates the fraction of point source flux that goes into the scattering halo, effectively :math:`(1 - \\exp(-\\tau)) / \\exp(-\\tau) = \\exp(\\tau) - 1` Returns ------- numpy.ndarray or astropy.units.Quantity (NE) [unitless] """ assert self.fabs is not None return np.sum(self.fhalo) / np.sum(self.fext) # http://stackoverflow.com/questions/2936863/python-implementing-slicing-in-getitem def __getitem__(self, key): """ Returns a Halo object that is a subset / slice of the original halo. Slice values must be floats that follow the same units as the original `lam` value. """ if isinstance(key, int): return self._get_lam_index(key) if isinstance(key, slice): lmin = key.start lmax = key.stop if lmin is None: ii = (self.lam.value < lmax) elif lmax is None: ii = (self.lam.value >= lmin) else: ii = (self.lam.value >= lmin) & (self.lam.value < lmax) return self._get_lam_slice(ii) def _get_lam_slice(self, ii): result = Halo(self.lam.value[ii] * self.lam.unit, self.theta) if self.norm_int is not None: result.description = self.description result.norm_int = self.norm_int[ii,...] result.taux = self.taux[ii] if self.fabs is not None: result.calculate_intensity(self.fabs[ii], ftype='abs') return result def _get_lam_index(self, i): assert isinstance(i, int) result = Halo(self.lam.value[i] * self.lam.unit, self.theta) if self.norm_int is not None: result.description = self.description result.norm_int = np.array([self.norm_int[i,...]]) result.taux = self.taux[i] if self.fabs is not None: flux = np.array([self.fabs[i]]) result.calculate_intensity(flux, ftype='abs') return result
[docs] def ecf(self, th, n, log=False): """ Return the fraction of (energy-integrated) scattering halo flux enclosed by theta < th Parameters ---------- th : astropy.units.Quantity or float Maximum theta value; plain values are assumed to be in arcsec. n : int Number of theta values to use for interpolating. log : bool If ``True``, use a log-spaced theta grid. Default ``False``. Returns ------- float Enclosed fraction of halo flux within ``theta < th``. Note ---- Small-angle scattering is assumed. """ if isinstance(th, u.Quantity): thmax = th.to('arcsec').value else: thmax = th * u.arcsec # Halo theta values, which will serve as grid th_asec = self.theta.to('arcsec').value # Make sure that the value of interest is within the calculated grid assert thmax > np.min(th_asec) & thmax < np.max(th_sec) # Get a new grid of intensity values over which to integrate if log: th_grid = np.logspace(np.log10(np.min(th_asec)), np.log10(np.max(th_asec)), n) else: th_grid = np.linspace(np.min(th_asec), np.max(th_asec), n) I_grid = np.interp(th_grid, th_asec, self.intensity) fh_tot = np.sum(self.fhalo) # total flux in the halo enclosed = trapz(I_grid * 2.0 * np.pi * th_grid, th_grid) return enclosed / fh_tot
[docs] def frac_halo(self, th, n, log=False): """ Calculate fraction of halo, as a function of energy, enclosed by theta < th Parameters ---------- th : astropy.units.Quantity or float Maximum theta value; plain values are assumed to be in arcsec. n : int Number of theta values to use for interpolating. log : bool If ``True``, use a log-spaced theta grid. Default ``False``. Returns ------- numpy.ndarray Enclosed halo fraction as a function of energy [size NE]. Note ---- Small-angle scattering is assumed. """ # Will break if we attempt to run this without an intensity calculation assert self.norm_intensity is not None if isinstance(th, u.Quantity): thmax = th.to('arcsec').value else: thmax = th * u.arcsec # Halo theta values, which will serve as grid th_asec = self.theta.to('arcsec').value # Make sure that the value of interest is within the calculated grid assert thmax > np.min(th_asec) & thmax < np.max(th_sec) # Get a new grid of intensity values over which to integrate if log: th_grid = np.logspace(np.log10(np.min(th_asec)), np.log10(np.max(th_asec)), n) else: th_grid = np.linspace(np.min(th_asec), np.max(th_asec), n) result = [] for i in range(len(self.lam)): I_grid = np.interp(th_grid, th_asec, self.norm_int[i,:]) enclosed = trapz(I_grid * 2.0 * np.pi * th_grid, th_grid) result.append[enclosed] return np.array(result).flatten() # unitless, size NE
[docs] def write(self, filename, overwrite=True): """ Parameters ---------- filename : str Name for the output FITS file. overwrite : bool If ``True`` (default), overwrite an existing file of the same name. Writes the ``norm_int`` attribute to a FITS file, along with the LAM, THETA, and TAUX values. """ hdr = fits.Header() hdr['COMMENT'] = "Normalized halo intensity as a function of angle" hdr['COMMENT'] = "HDU 1 is LAM, HDU 2 is THETA, HDU 3 is TAUX" hdr['INTUNIT'] = self.norm_int.unit.to_string() primary_hdu = fits.PrimaryHDU(self.norm_int.value, header=hdr) hdus_to_write = [primary_hdu] + self._write_halo_pars() hdu_list = fits.HDUList(hdus=hdus_to_write) hdu_list.writeto(filename, overwrite=overwrite) return
##----- Helper material def _write_halo_pars(self): """Write the FITS file""" # e.g. pars['lam'], pars['a'] # should this be part of WCS? c1 = fits.BinTableHDU.from_columns( [fits.Column(name='lam', array=helpers._make_array(self.lam.value), format='D', unit=self.lam.unit.to_string())]) c2 = fits.BinTableHDU.from_columns( [fits.Column(name='theta', array=helpers._make_array(self.theta.value), format='D', unit=self.theta.unit.to_string())]) c3 = fits.BinTableHDU.from_columns( [fits.Column(name='taux', array=helpers._make_array(self.taux), format='D', unit='')]) return [c1, c2, c3] def _read_from_file(self, filename): """Read a Halo object from a FITS file""" ff = fits.open(filename) # Load the normalized intensity self.norm_int = ff[0].data * u.Unit(ff[0].header['INTUNIT']) # Load the other parameters self.lam = ff[1].data['lam'] * u.Unit(ff[1].columns['lam'].unit) self.theta = ff[2].data['theta'] * u.Unit(ff[2].columns['theta'].unit) self.taux = ff[3].data['taux'] # Set halo type self.description = filename ##------ Make a fake image with a telescope arf
[docs] def fake_image(self, arf, src_flux, exposure, pix_scale=0.5, num_pix=[2400,2400], lmin=None, lmax=None, save_file=None, **kwargs): """Make a fake image file using a telescope ARF as input. Parameters ---------- arf : str Filename of telescope ARF src_flux : numpy.ndarray [phot/cm^2/s] Describes the absorbed (without X-ray scattering) flux model for the central X-ray point source. Must correspond to the values in Halo.lam exposure : float [seconds] Exposure time to use pix_scale : float [arcsec] Size of simulated pixels num_pix : ints (nx,ny) Size of pixel grid to use lmin : float Minimum halo.lam value (Default:None uses entire range) lmax : float Maximum halo.lam value (Default:None uses entire range) save_file : str, optional Filename to use if you want to save the output to a FITS file. Default ``None``. Returns ------- numpy.ndarray (2D) Image of a dust scattering halo. The halo intensity at different energies are converted into counts using the ARF. Then a Poisson distribution is used to simulate the number of counts in each pixel. """ assert len(src_flux) == len(self.lam) xlen, ylen = num_pix xcen, ycen = xlen//2, ylen//2 ccdx, ccdy = np.meshgrid(np.arange(xlen), np.arange(ylen)) radius = np.sqrt((ccdx - xcen)**2 + (ccdy - ycen)**2) # Typical ARF files have columns 'ENERG_LO', 'ENERG_HI', 'SPECRESP' arf_data = fits.open(arf)['SPECRESP'].data arf_x = 0.5*(arf_data['ENERG_LO'] + arf_data['ENERG_HI']) arf_y = arf_data['SPECRESP'] arf = InterpolatedUnivariateSpline(arf_x, arf_y, k=1) # Source counts to use for each energy bin if self.lam_unit == 'angs': ltemp = self.lam * u.angstrom ltemp_kev = ltemp.to(u.keV, equivalencies=u.spectral()).value arf_temp = arf(ltemp_kev)[::-1] src_counts = src_flux * arf_temp * exposure else: src_counts = src_flux * arf(self.lam) * exposure # Decide which energy indexes to use if lmin is None: imin = 0 else: imin = min(np.arange(len(self.lam))[self.lam >= lmin]) if lmax is None: iend = len(self.lam) else: iend = max(np.arange(len(self.lam))[self.lam <= lmax]) #iend = imax #if imax < 0: # iend = np.arange(len(self.lam)+1)[imax] # Add up randomized image for each energy index value r_asec = radius * pix_scale result = np.zeros_like(radius) for i in np.arange(imin, iend): # interp object for halo grid (arcsec, counts/arcsec^2) h_interp = InterpolatedUnivariateSpline( self.theta, self.norm_int[i,:] * src_counts[i], k=1) # counts/arcsec^2 # Get the corresponding counts at each radial value in the grid pix_counts = h_interp(r_asec) * pix_scale**2 # counts per pixel # Some of the interpolated values are below zero. This is not okay. Set those to zero. pix_counts[pix_counts < 0.0] = 0.0 # Use poisson statistics to get a random value pix_random = np.random.poisson(pix_counts) # add it to the final result result += pix_random if save_file is not None: hdu = fits.PrimaryHDU(result) hdul = fits.HDUList([hdu]) hdul.writeto(save_file, overwrite=True) return result