# Copyright (C) 2020 Jeremy Sanders <jeremy@jeremysanders.net>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import numpy as N
from scipy.special import gammaln
from . import utils
from . import fast
from astropy.wcs import WCS
[docs]
class Image:
"""Image class details images to fit.
:param img_id: unique id for image (str or int)
:param imagearr: numpy image array for image
:param float emin_keV: minimum energy
:param float emax_keV: maximum energy
:param rmf: response matrix file
:param arf: ancillary response matrix file
:param pixsize_as: size of pixels in arcsec
:param expmaps: dict of numpy exposure map arrays (different components can use different exposure maps, if needed)
:param mask: numpy mask array (None means no mask)
:param psf: PSF object
:param origin: position in pixels (y,x) coordinates are measured relative to (should be same position in all images)
:param wcs: optional WCS stored with this image
:param optimal_size: expand images to be optimal size for PSF convolution
:param pad: pad image sizes by size given to ensure convolution doesn't wrap
"""
def __init__(
self, img_id, imagearr,
emin_keV=0.5, emax_keV=2.0,
rmf='image.rmf',
arf='image.arf',
pixsize_as=1.0,
expmaps=None,
mask=None,
psf=None,
origin=(0,0),
wcs=None,
optimal_size=True,
pad=0,
):
self.img_id = img_id
self.emin_keV = emin_keV
self.emax_keV = emax_keV
self.rmf = rmf
self.arf = arf
self.pixsize_as = pixsize_as
self.invpixsize = 1/pixsize_as
self.wcs = wcs
self.orig_shape = imagearr.shape
if optimal_size:
imagearr, expmaps, mask = self._expandOptimal(
imagearr, expmaps, mask, pad)
# copy image
self.shape = imagearr.shape
self.imagearr = utils.empty_aligned(self.shape, dtype=N.float32)
self.imagearr[:,:] = imagearr
if self.shape[0] % 2 !=0 or self.shape[1] % 2 != 0:
raise RuntimeError('Input images must have even numbers of pixels')
# mask should be -1 (included) or 0 (excluded), for use in simd
self.mask = utils.zeros_aligned(self.shape, dtype=N.int32)
if mask is None:
self.mask.fill(-1)
else:
self.mask[:,:] = N.where(mask, -1, 0)
self.expmaps = expmaps
if psf is None:
self.psf = None
else:
# copy so image-specific parts are kept separate
self.psf = psf.copy()
self.psf.matchImage(self.shape, pixsize_as)
self.origin = origin
def _expandOptimal(self, imagearr, expmaps, mask, pad):
"""Expand image sizes to be optimal for FFT speed
pyfftw works fastest on images which are factors of
2**a 3**b 5**c 7**d 11**e 13**f, where e+f is either 0 or 1, and a to d >=0
"""
# get list of fast sizes (reliable up to 2**16)
primes = [2,3,5,7]
fastdims = set(primes)
for nprime in range(16):
for p in primes:
for v in list(fastdims):
if p*v <= 65536:
fastdims.add(p*v)
# also factors of 11 and 13 of above are fast
for f in 11, 13:
for v in list(fastdims):
if v*f <= 65536:
fastdims.add(v*f)
# get rid of odd factors
for d in list(fastdims):
if d%2 != 0:
fastdims.remove(d)
fastdims = N.array(sorted(fastdims))
# find next largest size
odim0, odim1 = imagearr.shape
dim0 = fastdims[N.searchsorted(fastdims, odim0+pad)]
dim1 = fastdims[N.searchsorted(fastdims, odim1+pad)]
# no expansion necessary
if odim0==dim0 and odim1==dim1:
return imagearr, expmaps, mask
# expand image with 0 at edge
newimage = N.zeros((dim0, dim1), dtype=N.float32)
newimage[:odim0,:odim1] = imagearr
# expand exposure maps with 0 at edge
if expmaps is None:
newexpmaps = None
else:
newexpmaps = {}
for name, expmap in expmaps.items():
newexpmap = N.zeros((dim0, dim1), dtype=N.float32)
newexpmap[:odim0,:odim1] = expmap
newexpmaps[name] = newexpmap
# expand mask with 0 at edge
if mask is None:
mask = N.ones((odim0, odim1), dtype=N.int32)
newmask = N.zeros((dim0, dim1), dtype=N.int32)
newmask[:odim0,:odim1] = N.where(mask, 1, 0)
return newimage, newexpmaps, newmask
[docs]
def normalizeExposurePosn(self, ra, dec, rad_arcsec, exposure, expmaps=None):
"""Normalize exposure maps to have the mean exposure over the region.
The data are scaled so that the pixels within the circle (ra,
dec) within the radius have the exposure given.
Requires a WCS to set on the input image. The mask is used to
remove pixels when averaging.
:param ra: ra (deg)
:param dec: dec (deg)
:param rad_arcsec: radius (arcsec)
:param exposure: exposure (s)
:param expmaps: which exposure maps to process: list or by default, all
"""
assert self.wcs is not None
xc, yc = self.wcs.all_world2pix(ra, dec, 0)
selpix = N.fromfunction(
lambda y, x: (y-yc)**2+(x-xc)**2 < (rad_arcsec/self.pixsize_as)**2,
self.imagearr.shape
) & (self.mask != 0)
if expmaps is None:
expmaps = list(self.expmaps)
for expmap in expmaps:
meanval = N.mean(self.expmaps[expmap][selpix])
self.expmaps[expmap] *= (exposure/meanval)
[docs]
def binUp(self, factor):
"""Return binned copy of image.
:param factor: integer bin factor
"""
neworigin = (
(self.origin[0]+0.5)/factor - 0.5,
(self.origin[1]+0.5)/factor - 0.5,
)
# bin up cts
newimg = utils.binImage(self.imagearr, factor)
# mask, keeping pixels where there are no masked pixels in any subpixel
newmask = (
utils.binImage(N.where(self.mask, 1, 0), factor) ==
utils.binImage(N.ones(self.mask.shape, dtype=N.int32), factor)
)
# compute binned up mean exposure maps
newexpmaps = {
name: utils.binImage(expmap, factor, mean=True)
for name, expmap in self.expmaps.items()
}
# rescale WCS if given
newwcs = None
if self.wcs is not None:
hdr = self.wcs.to_header()
hdr['CDELT1'] = hdr['CDELT1'] * factor
hdr['CDELT2'] = hdr['CDELT2'] * factor
hdr['CRPIX1'] = (hdr['CRPIX1']-0.5)/factor + 0.5
hdr['CRPIX2'] = (hdr['CRPIX2']-0.5)/factor + 0.5
newwcs = WCS(hdr)
return Image(
self.img_id,
newimg,
emin_keV=self.emin_keV, emax_keV=self.emax_keV,
rmf=self.rmf, arf=self.arf,
pixsize_as=self.pixsize_as*factor,
expmaps=newexpmaps,
mask=newmask,
psf=self.psf,
origin=neworigin,
wcs=newwcs,
)
[docs]
class PSF:
"""PSF modelling class.
:param img: 2D PSF image
:param pixsize_as: size of pixels in arcsec
:param origin: (y, x) origin of PSF centre
"""
def __init__(self, img, pixsize_as=1.0, origin=None):
self.img = img
if origin is None:
self.origin = img.shape[0]/2, img.shape[1]/2
else:
self.origin = origin
self.pixsize_as = pixsize_as
self.resample = None
self.conv = None
def copy(self):
return PSF(
N.array(self.img),
pixsize_as=self.pixsize_as,
origin=self.origin,
)
[docs]
def matchImage(self, imgshape, img_pixsize_as):
"""Adjust PSF to different pixel size and image shape."""
self.resample = utils.empty_aligned(imgshape, dtype=N.float32)
fast.resamplePSFImage(
self.img.astype(N.float32),
self.resample,
psf_pixsize=self.pixsize_as,
img_pixsize=img_pixsize_as,
psf_ox=self.origin[1],
psf_oy=self.origin[0],
)
self.conv = utils.ConvPSFHelper(self.resample)
[docs]
def applyTo(self, inimg, minval=1e-10):
"""Convolve image with PSF."""
self.conv.doConv(inimg, inimg)
# make sure convolution is positive
fast.clip2DMax(inimg, minval)