# Copyright (C) 2020 Jeremy Sanders <jeremy@jeremysanders.net>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import math
import numpy as N
from astropy.io import fits
from .par import Par, PriorGaussian, PriorBoundedGaussian
from . import utils
from . import ratecalc
[docs]
class ModelEvaluationException(Exception):
pass
[docs]
class TotalModel:
"""Combined model for data.
:param pars: Pars object (currently unused here)
:param images: list of Image objects
:param src_models: list of source models
:param src_expmap: name of exposure map to use for sources
:param back_models: list of background models
"""
def __init__(
self, pars, images, src_models=None, src_expmap=None,
back_models=None):
self.images = images
self.src_models = src_models
self.src_expmap = src_expmap
self.back_models = back_models
[docs]
def compute(
self,
pars,
apply_psf=True, apply_expmap=True,
apply_src=True, apply_back=True,
):
"""Return a list of image arrays for the models.
:param pars: Pars() object with parameters
:param apply_psf: whether to convolve with PSF
:param apply_expmap: whether to multiply by exposure map
:param apply_src: apply source models
:param apply_back: apply background models
"""
# output images for each Image
imgarrs = [
utils.zeros_aligned(image.shape, dtype=N.float32)
for image in self.images
]
# add the models to the image
if self.src_models and apply_src:
# actual model computation
for model in self.src_models:
model.compute(pars, imgarrs)
# optional scaling of model
for imgarr, image in zip(imgarrs, self.images):
scale_name = f'src_logscale_{image.img_id}'
if scale_name in pars:
imgarr *= math.exp(pars[scale_name].v)
# convolve with PSF
if apply_psf:
for imgarr, image in zip(imgarrs, self.images):
if image.psf is not None:
image.psf.applyTo(imgarr)
# apply exposure map
if apply_expmap and self.src_expmap is not None:
for imgarr, image in zip(imgarrs, self.images):
imgarr *= image.expmaps[self.src_expmap]
# add on background
if self.back_models and apply_back:
for model in self.back_models:
if "apply_expmap" in model.compute.__code__.co_varnames: # for back model with an apply_expmap option
model.compute(pars, imgarrs, apply_expmap=apply_expmap)
else:
model.compute(pars, imgarrs)
return imgarrs
[docs]
def compute_separate(
self,
pars,
apply_psf=True, apply_expmap=True,
):
"""Compute model for each component separately and combined.
:param pars: Pars() object with parameters
:param apply_psf: whether to convolve with PSF
:param apply_expmap: whether to multiply by exposure map
"""
out = {}
def make_blank_images():
return [
utils.zeros_aligned(image.shape, dtype=N.float32)
for image in self.images
]
# source models
for model in self.src_models:
imgarrs = make_blank_images()
model.compute(pars, imgarrs)
# convolve with PSF
if apply_psf:
for imgarr, image in zip(imgarrs, self.images):
if image.psf is not None:
image.psf.applyTo(imgarr)
# apply exposure map
if apply_expmap and self.src_expmap is not None:
for imgarr, image in zip(imgarrs, self.images):
imgarr *= image.expmaps[self.src_expmap]
out[model.name] = imgarrs
# background components
for model in self.back_models:
imgarrs = make_blank_images()
model.compute(pars, imgarrs, apply_expmap=apply_expmap)
out[model.name] = imgarrs
# add up total
totimgarrs = make_blank_images()
for name in out:
for i, arr in enumerate(out[name]):
totimgarrs[i] += arr
out['total'] = totimgarrs
return out
[docs]
def prior(self, pars):
"""Given parameters, compute prior.
This includes the contribution from parameters, the object
models and background models.
"""
total = pars.calcPrior()
if self.src_models:
for model in self.src_models:
total += model.prior(pars)
if self.back_models:
for model in self.back_models:
total += model.prior(pars)
return total
[docs]
def writeToFits(self, filename, pars, mask=True, apply_psf=True,
apply_expmap=True, apply_src=True, apply_back=True,
trim_original=False):
"""Write model as a series of HDUs in a FITS file.
:param filename: FITS filename
:param pars: pars dictionary
:param mask: mask out masked pixels
:param apply_psf: convolve with PSF
:param apply_expmap: multiply by exposure map
:param apply_src: include source models
:param apply_back: include background models
:param trim_original: write to original size
"""
hdus = [fits.PrimaryHDU()]
modimgs = self.compute(
pars,
apply_psf=apply_psf, apply_expmap=apply_expmap,
apply_src=apply_src, apply_back=apply_back)
for image, mod in zip(self.images, modimgs):
if mask:
mod[image.mask == 0] = N.nan
if trim_original:
mod = mod[:image.orig_shape[0], :image.orig_shape[1]]
hdu = fits.ImageHDU(mod)
hdu.header['EXTNAME'] = image.img_id
if image.wcs is not None:
hdu.header.extend(image.wcs.to_header().cards)
hdus.append(hdu)
hdulist = fits.HDUList(hdus)
hdulist.writeto(filename, overwrite=True)
[docs]
class BackModelBase:
"""Base background model. Does nothing.
:param name: name of model
:param pars: Pars() object
:param images: list of Image() objects
:param expmap: exposure map name to lookup in Image
"""
def __init__(self, name, pars, images, expmap=None):
self.name = name
self.images = images
self.expmap = expmap
[docs]
def compute(self, pars, imgarrs, apply_expmap=True):
"""Compute the background model given parameters and input images.
If apply_expmap is True, the expmap should be applied to the output.
"""
pass
def prior(self, pars):
return 0
[docs]
class BackModelFlat(BackModelBase):
"""Flat background model.
:param name: name of model
:param pars: dict of parameters
:param images: list of data.Image objects
:param bool log: apply log scaling to value of background
:param bool normarea: normalise background to per sq arcsec
:param defval: default parameter
:param expmap: name or index of exposure map to use (if any)
"""
def __init__(
self, name, pars, images,
log=False, normarea=False,
defval=0.,
expmap=None
):
"""A flat background model.
"""
BackModelBase.__init__(self, name, pars, images, expmap=expmap)
for image in images:
if log:
pars['%s_%s' % (name, image.img_id)] = Par(defval)
else:
pars['%s_%s' % (name, image.img_id)] = Par(defval, minval=0.)
pars['%s_scale' % name] = Par(
1.0, prior=PriorGaussian(1.0, 0.05), frozen=True)
self.normarea = normarea
self.log = log
self.expmap = expmap
[docs]
def compute(self, pars, imgarrs, apply_expmap=True):
scale = pars['%s_scale' % self.name].v
for image, imgarr in zip(self.images, imgarrs):
v = pars['%s_%s' % (self.name, image.img_id)].v
if self.log:
v = math.exp(min(v, 100))
if self.normarea:
v *= image.pixsize_as ** 2
v *= scale
if self.expmap is not None and apply_expmap:
v *= image.expmaps[self.expmap]
imgarr += v
[docs]
class BackModelVigNoVig(BackModelBase):
"""A background model with vignetted and non-vignetted components.
The X_vf holds the vignetted fraction. This is an unbounded
parameter which is transformed to the vignetting fraction via the
sigmoid function. By default it has a Gaussian prior about 0.
All parameters are log units per square arcsec
Note that the unvignetted exposure map is multiplied by median of
the ratio of the vignetted to unvignetted if rescale_novig is
True. This helps to make a seamless transition between the two as
a function of X_vf. This reduces the degeneracies between the X_vf
and normalisation parameters.
:param name: name of model
:param pars: dict of parameters
:param images: list of data.Image objects
:param defval: default parameter
:param expmap: name or index of vignetted exposure map to use
:param expmap_novig: name or index of unvignetted exposure map to use
:param rescale_vig: rescale non-vignetted map by ratio of vig to non-vig
"""
def __init__(
self, name, pars, images,
defval=0.,
expmap='expmap', expmap_novig='expmap_novig',
rescale_novig=True
):
BackModelBase.__init__(self, name, pars, images, expmap=expmap)
pars['%s_logscale' % name] = Par(
0, prior=PriorGaussian(0, 0.05), frozen=True)
self.vigratios = []
for image in images:
imgkey = '%s_%s' % (name, image.img_id)
pars[imgkey] = Par(defval, minval=-30, maxval=30)
pars['%s_vf' % imgkey] = Par(0.1, prior=PriorBoundedGaussian(
0, 1, minval=-5, maxval=5))
if rescale_novig:
self.vigratios.append(N.median(
(image.expmaps[expmap] /
image.expmaps[expmap_novig])[image.mask != 0]
))
else:
self.vigratios.append(1)
self.expmap = expmap
self.expmap_novig = expmap_novig
[docs]
def compute(self, pars, imgarrs, apply_expmap=True):
scale = math.exp(pars['%s_logscale' % self.name].v)
for i, (image, imgarr) in enumerate(zip(self.images, imgarrs)):
imgkey = '%s_%s' % (self.name, image.img_id)
out = math.exp(pars[imgkey].v) * image.pixsize_as ** 2 * scale
if apply_expmap:
fracvig = (lambda x: math.exp(x) / (math.exp(x) + 1))(
pars['%s_vf' % imgkey].v
)
out = out * (
image.expmaps[self.expmap] * fracvig +
image.expmaps[self.expmap_novig] * ((1 - fracvig) * self.vigratios[i])
)
imgarr += out
[docs]
def estimatePars(self, pars, images, binup=8):
"""Take images and estimate initial background values."""
scale = math.exp(pars['%s_logscale' % self.name].v)
for i, image in enumerate(images):
imgkey = '%s_%s' % (self.name, image.img_id)
fracvig = (lambda x: math.exp(x) / (math.exp(x) + 1))(
pars['%s_vf' % imgkey].v
)
expmap = (
image.expmaps[self.expmap] * fracvig +
image.expmaps[self.expmap_novig] * ((1 - fracvig) * self.vigratios[i])
)
average = N.mean((image.imagearr / expmap)[image.mask != 0])
if average > 0:
v = math.log(average / scale / image.pixsize_as ** 2)
pars['%s_%s' % (self.name, image.img_id)].val = v
[docs]
class SrcModelBase:
"""Base class for source models.
:param name: name of model
:param pars: dict of parameters
:param images: list of data.Image objects
:param cx: initial centre x coordinate
:param cy: initial centre y coordinate
"""
def __init__(self, name, pars, images, cx=0., cy=0.):
self.name = name
self.images = images
# position of source
pars['%s_cx' % name] = Par(cx)
pars['%s_cy' % name] = Par(cy)
def compute(self, pars, imgarrs):
pass
def prior(self, pars):
return 0