# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
This module defines gridded PSF models.
"""
import copy
import itertools
import numpy as np
from astropy.io import registry
from astropy.modeling import Fittable2DModel, Parameter
from astropy.nddata import NDData
from astropy.utils.decorators import lazyproperty
from scipy.interpolate import RectBivariateSpline
from photutils.psf.model_io import (GriddedPSFModelRead, _get_metadata,
_read_stdpsf, is_stdpsf, is_webbpsf,
stdpsf_reader, webbpsf_reader)
from photutils.psf.model_plotting import ModelGridPlotMixin
from photutils.utils._parameters import as_pair
__all__ = ['GriddedPSFModel', 'STDPSFGrid']
__doctest_skip__ = ['STDPSFGrid']
[docs]
class GriddedPSFModel(ModelGridPlotMixin, Fittable2DModel):
"""
A model for a grid of 2D ePSF models.
The ePSF models are defined at fiducial detector locations and are
bilinearly interpolated to calculate an ePSF model at an arbitrary
(x, y) detector position. The fiducial detector locations are must
form a rectangular grid.
The model has three model parameters: an image intensity scaling
factor (``flux``) which is applied to the input image, and two
positional parameters (``x_0`` and ``y_0``) indicating the location
of a feature in the coordinate grid on which the model is evaluated.
When evaluating this model, it cannot be called with x and y arrays
that have greater than 2 dimensions.
Parameters
----------
nddata : `~astropy.nddata.NDData`
A `~astropy.nddata.NDData` object containing the grid of
reference ePSF arrays. The data attribute must contain a 3D
`~numpy.ndarray` containing a stack of the 2D ePSFs with a shape
of ``(N_psf, ePSF_ny, ePSF_nx)``. The length of the x and y axes
must both be at least 4. All elements of the input image data
must be finite. The PSF peak is assumed to be located at the
center of the input image. The array must be normalized so that
the total flux of a source is 1.0. This means that the sum of
the values in the input image PSF over an infinite grid is 1.0.
In practice, the sum of the data values in the input image may
be less than 1.0 if the input image only covers a finite region
of the PSF. These correction factors can be estimated from the
ensquared or encircled energy of the PSF based on the size of
the input image.
The meta attribute must be dictionary containing the following:
* ``'grid_xypos'``: A list of the (x, y) grid positions of
each reference ePSF. The order of positions should match the
first axis of the 3D `~numpy.ndarray` of ePSFs. In other words,
``grid_xypos[i]`` should be the (x, y) position of the reference
ePSF defined in ``nddata.data[i]``. The grid positions must form
a rectangular grid.
* ``'oversampling'``: The integer oversampling factor(s) of the
input ePSF images. If ``oversampling`` is a scalar then it will
be used for both axes. If ``oversampling`` has two elements,
they must be in ``(y, x)`` order.
The meta attribute may contain other properties such as the
telescope, instrument, detector, and filter of the ePSF.
flux : float, optional
The flux scaling factor for the model. This is the total flux
of the source, assuming the input ePSF images are properly
normalized.
x_0, y_0 : float, optional
The (x, y) position of the PSF peak in the image in the output
coordinate grid on which the model is evaluated.
fill_value : float, optional
The value to use for points outside of the input pixel grid.
The default is 0.0.
Methods
-------
read(*args, **kwargs)
Class method to create a `GriddedPSFModel`
instance from a STDPSF FITS file. This method uses
:func:`~photutils.psf.stdpsf_reader` with the provided
parameters.
Notes
-----
Internally, the grid of ePSFs will be arranged and stored such that
it is sorted first by the y reference pixel coordinate and then by
the x reference pixel coordinate.
"""
flux = Parameter(description='Intensity scaling factor for the ePSF '
'model.', default=1.0)
x_0 = Parameter(description='x position in the output coordinate grid '
'where the model is evaluated.', default=0.0)
y_0 = Parameter(description='y position in the output coordinate grid '
'where the model is evaluated.', default=0.0)
read = registry.UnifiedReadWriteMethod(GriddedPSFModelRead)
def __init__(self, nddata, *, flux=flux.default, x_0=x_0.default,
y_0=y_0.default, fill_value=0.0):
self.data, self.grid_xypos = self._define_grid(nddata)
self._meta = nddata.meta.copy() # _meta to avoid the meta descriptor
self._oversampling = as_pair('oversampling',
nddata.meta['oversampling'],
lower_bound=(0, 1))
self.fill_value = fill_value
self._xgrid = np.unique(self.grid_xypos[:, 0]) # sorted
self._ygrid = np.unique(self.grid_xypos[:, 1]) # sorted
self.meta['grid_shape'] = (len(self._ygrid), len(self._xgrid))
self._interpolator = {}
super().__init__(flux, x_0, y_0)
@staticmethod
def _validate_data(data):
"""
Validate the input ePSF data.
Parameters
----------
data : `~astropy.nddata.NDData`
The input NDData object containing the ePSF data.
Raises
------
TypeError
If the input data is not an NDData instance.
ValueError
If the input data is not a 3D numpy ndarray or if the input
data contains NaNs or infs.
"""
if not isinstance(data, NDData):
raise TypeError('data must be an NDData instance.')
if data.data.ndim != 3:
raise ValueError('The NDData data attribute must be a 3D numpy '
'ndarray')
if not np.all(np.isfinite(data.data)):
raise ValueError('All elements of input data must be finite.')
# this is required by RectBivariateSpline for kx=3, ky=3
if np.any(np.array(data.data.shape[1:]) < 4):
raise ValueError('The length of the PSF x and y axes must both '
'be at least 4.')
if 'oversampling' not in data.meta:
raise ValueError('"oversampling" must be in the nddata meta '
'dictionary.')
@staticmethod
def _is_rectangular_grid(grid_xypos):
"""
Check if the input ``grid_xypos`` forms a rectangular grid.
Parameters
----------
grid_xypos : array of (x, y) pairs
The fiducial (x, y) positions of the ePSFs.
Returns
-------
result : bool
Returns `True` if the input ``grid_xypos`` forms a
rectangular grid.
"""
xgrid = np.unique(grid_xypos[:, 0]) # sorted
ygrid = np.unique(grid_xypos[:, 1]) # sorted
expected_points = {(x, y) for x in xgrid for y in ygrid}
grid = set(map(tuple, grid_xypos))
return grid == expected_points
def _validate_grid(self, data):
"""
Validate the input ePSF grid.
Parameters
----------
data : `~astropy.nddata.NDData`
The input NDData object containing the ePSF data.
Raises
------
ValueError
If the input grid_xypos does not form a rectangular grid.
"""
try:
grid_xypos = np.array(data.meta['grid_xypos'])
except KeyError as exc:
raise ValueError('"grid_xypos" must be in the nddata meta '
'dictionary.') from exc
if len(grid_xypos) != data.data.shape[0]:
raise ValueError('The length of grid_xypos must match the number '
'of input ePSFs.')
if not self._is_rectangular_grid(grid_xypos):
raise ValueError('grid_xypos must form a rectangular grid.')
def _define_grid(self, nddata):
"""
Sort the input ePSF data into a rectangular grid where the ePSFs
are sorted first by y and then by x.
Parameters
----------
nddata : `~astropy.nddata.NDData`
The input NDData object containing the ePSF data.
Returns
-------
data : 3D `~numpy.ndarray`
The 3D array of ePSFs.
grid_xypos : array of (x, y) pairs
The (x, y) positions of the ePSFs, sorted first by y and
then by x.
"""
self._validate_data(nddata)
self._validate_grid(nddata)
grid_xypos = np.array(nddata.meta['grid_xypos'])
# sort by y and then by x (last key is primary)
idx = np.lexsort((grid_xypos[:, 0], grid_xypos[:, 1]))
return nddata.data[idx], grid_xypos[idx]
def _cls_info(self):
cls_info = []
keys = ('STDPSF', 'instrument', 'detector', 'filter', 'grid_shape')
for key in keys:
if key in self.meta:
name = key.capitalize() if key != 'STDPSF' else key
cls_info.append((name, self.meta[key]))
cls_info.extend([('Number of PSFs', len(self.grid_xypos)),
('PSF shape (oversampled pixels)',
self.data.shape[1:]),
('Oversampling', tuple(self.oversampling))])
return cls_info
def __str__(self):
return self._format_str(keywords=self._cls_info())
[docs]
def copy(self):
"""
Return a copy of this model where only the model parameters are
copied.
All other copied model attributes are references to the
original model. This prevents copying the ePSF grid data, which
may contain a large array.
This method is useful if one is interested in only changing
the model parameters in a model copy. It is used in the PSF
photometry classes during model fitting.
Use the `deepcopy` method if you want to copy all of the model
attributes, including the ePSF grid data.
Returns
-------
result : `GriddedPSFModel`
A copy of this model with only the model parameters copied.
"""
newcls = object.__new__(self.__class__)
for key, val in self.__dict__.items():
if key in self.param_names: # copy only the parameter values
newcls.__dict__[key] = copy.copy(val)
else:
newcls.__dict__[key] = val
return newcls
[docs]
def deepcopy(self):
"""
Return a deep copy of this model.
Returns
-------
result : `GriddedPSFModel`
A deep copy of this model.
"""
return copy.deepcopy(self)
@property
def oversampling(self):
"""
The integer oversampling factor(s) of the input ePSF images.
If ``oversampling`` is a scalar then it will be used for both
axes. If ``oversampling`` has two elements, they must be in
``(y, x)`` order.
"""
return self._oversampling
@oversampling.setter
def oversampling(self, value):
"""
Set the oversampling factor(s) of the input ePSF images.
Parameters
----------
value : int or tuple of int
The integer oversampling factor(s) of the input ePSF images.
If ``oversampling`` is a scalar then it will be used for both
axes. If ``oversampling`` has two elements, they must be in
``(y, x)`` order.
"""
self._oversampling = as_pair('oversampling', value, lower_bound=(0, 1))
def _calc_bounding_box(self):
"""
Set a bounding box defining the limits of the model.
Returns
-------
bbox : tuple
A bounding box defining the ((y_min, y_max), (x_min, x_max))
limits of the model.
"""
dy, dx = np.array(self.data.shape[1:]) / 2 / self.oversampling
return ((self.y_0 - dy, self.y_0 + dy), (self.x_0 - dx, self.x_0 + dx))
@property
def bounding_box(self):
"""
The bounding box of the model.
Examples
--------
>>> from itertools import product
>>> import numpy as np
>>> from astropy.nddata import NDData
>>> from photutils.psf import GaussianPSF, GriddedPSFModel
>>> psfs = []
>>> yy, xx = np.mgrid[0:101, 0:101]
>>> for i in range(16):
... theta = np.deg2rad(i * 10.0)
... gmodel = GaussianPSF(flux=1, x_0=50, y_0=50, x_fwhm=10,
... y_fwhm=5, theta=theta)
... psfs.append(gmodel(xx, yy))
>>> xgrid = [0, 40, 160, 200]
>>> ygrid = [0, 60, 140, 200]
>>> meta = {}
>>> meta['grid_xypos'] = list(product(xgrid, ygrid))
>>> meta['oversampling'] = 4
>>> nddata = NDData(psfs, meta=meta)
>>> model = GriddedPSFModel(nddata, flux=1, x_0=0, y_0=0)
>>> model.bounding_box # doctest: +FLOAT_CMP
ModelBoundingBox(
intervals={
x: Interval(lower=-12.625, upper=12.625)
y: Interval(lower=-12.625, upper=12.625)
}
model=GriddedPSFModel(inputs=('x', 'y'))
order='C'
)
"""
return self._calc_bounding_box()
@lazyproperty
def origin(self):
"""
A 1D `~numpy.ndarray` (x, y) pixel coordinates within the
model's 2D image of the origin of the coordinate system.
"""
xyorigin = (np.array(self.data.shape) - 1) / 2
return xyorigin[::-1]
@lazyproperty
def _interp_xyidx(self):
"""
The x and y indices for the interpolator.
"""
xidx = np.arange(self.data.shape[2])
yidx = np.arange(self.data.shape[1])
return xidx, yidx
def _calc_interpolator(self, grid_idx):
"""
Calculate the `~scipy.interpolate.RectBivariateSpline`
interpolator for an input ePSF image at the given reference (x,
y) position.
The resulting interpolator is cached in the `_interpolator`
dictionary for reuse.
"""
xypos = tuple(self.grid_xypos[grid_idx])
if xypos in self._interpolator:
return self._interpolator[xypos]
# RectBivariateSpline expects the data to be in (x, y) axis order
data = self.data[grid_idx]
interp = RectBivariateSpline(*self._interp_xyidx, data.T, kx=3, ky=3,
s=0)
self._interpolator[xypos] = interp
return interp
def _find_bounding_points(self, x, y):
"""
Find the grid indices and reference (x, y) points of the four
bounding grid points for a given (x, y) coordinate.
If the point is outside the grid, the nearest grid points are
selected. The input grid points do not need to be sorted.
Parameters
----------
x, y : float
The (x_0, y_0) position of the model.
Returns
-------
grid_idx : `~numpy.ndarray`
The indices of the four bounding points in the sorted
grid. The order is lower-left, lower-right, upper-left,
upper-right.
grid_xy : `~numpy.ndarray`
The x and y coordinates of the four bounding points. The
order is left, right, bottom, top.
"""
# Find the insertion indices for x and y in the sorted grids
xidx = np.searchsorted(self._xgrid, x) - 1
yidx = np.searchsorted(self._ygrid, y) - 1
# Clip the indices to valid ranges
xidx = np.clip(xidx, 0, len(self._xgrid) - 2)
yidx = np.clip(yidx, 0, len(self._ygrid) - 2)
# Find the four bounding points in the sorted grid
# (x0, y0) is the lower-left corner of the grid
# (x1, y1) is the upper-right corner of the grid
x0, x1 = self._xgrid[xidx], self._xgrid[xidx + 1]
y0, y1 = self._ygrid[yidx], self._ygrid[yidx + 1]
# Find the indices of these points in grid_xypos
xcoords, ycoords = self.grid_xypos.T
lower_left = np.where((xcoords == x0) & (ycoords == y0))[0][0]
lower_right = np.where((xcoords == x1) & (ycoords == y0))[0][0]
upper_left = np.where((xcoords == x0) & (ycoords == y1))[0][0]
upper_right = np.where((xcoords == x1) & (ycoords == y1))[0][0]
grid_idx = np.array((lower_left, lower_right, upper_left, upper_right))
grid_xy = np.array((x0, x1, y0, y1))
return grid_idx, grid_xy
def _calc_bilinear_weights(self, xi, yi, grid_xy):
"""
Calculate the bilinear interpolation weights for a given (xi,
yi) coordinate and the four bounding grid points.
Parameters
----------
xi, yi : float
The (x_0, y_0) position of the model.
grid_xy : `~numpy.ndarray`
The x and y coordinates of the four bounding points. The
order is left, right, bottom, top.
Returns
-------
weights : `~numpy.ndarray`
The bilinear interpolation weights for the four bounding
points. The order is lower-left, lower-right, upper-left,
upper-right.
"""
x0, x1, y0, y1 = grid_xy
xi = np.clip(xi, x0, x1)
yi = np.clip(yi, y0, y1)
norm = (x1 - x0) * (y1 - y0)
# lower-left, lower-right, upper-left, upper-right
return np.array([(x1 - xi) * (y1 - yi), (xi - x0) * (y1 - yi),
(x1 - xi) * (yi - y0), (xi - x0) * (yi - y0)]) / norm
def _calc_model_values(self, x_0, y_0, xi, yi):
"""
Calculate the ePSF model at a given (x_0, y_0) model coordinate
and the input (xi, yi) coordinate.
Parameters
----------
x_0, y_0 : float
The (x, y) position of the model.
xi, yi : float
The input (x, y) coordinates at which the model is
evaluated.
Returns
-------
result : float or `~numpy.ndarray`
The interpolated ePSF model at the input (x_0, y_0)
coordinate.
"""
grid_idx, grid_xy = self._find_bounding_points(x_0, y_0)
interpolators = np.array([self._calc_interpolator(gidx)
for gidx in grid_idx])
weights = self._calc_bilinear_weights(x_0, y_0, grid_xy)
idx = np.where(weights != 0)
interpolators = interpolators[idx]
weights = weights[idx]
result = 0
for interp, weight in zip(interpolators, weights, strict=True):
result += interp(xi, yi, grid=False) * weight
return result
[docs]
def evaluate(self, x, y, flux, x_0, y_0):
"""
Calculate the ePSF model at the input coordinates for the given
model parameters.
Parameters
----------
x, y : float or `~numpy.ndarray`
The x and y positions at which to evaluate the model.
flux : float
The flux scaling factor for the model.
x_0, y_0 : float
The (x, y) position of the model.
Returns
-------
evaluated_model : `~numpy.ndarray`
The evaluated model.
"""
if x.ndim > 2:
raise ValueError('x and y must be 1D or 2D.')
# the base Model.__call__() method converts scalar inputs to
# size-1 arrays before calling evaluate(), but we need scalar
# values for the interpolator
if not np.isscalar(x_0):
x_0 = x_0[0]
if not np.isscalar(y_0):
y_0 = y_0[0]
# now evaluate the ePSF at the (x_0, y_0) subpixel position on
# the input (x, y) values
xi = self.oversampling[1] * (np.asarray(x, dtype=float) - x_0)
yi = self.oversampling[0] * (np.asarray(y, dtype=float) - y_0)
xi += self.origin[0]
yi += self.origin[1]
evaluated_model = flux * self._calc_model_values(x_0, y_0, xi, yi)
if self.fill_value is not None:
# set pixels that are outside the input pixel grid to the
# fill_value to avoid extrapolation; these bounds match the
# RegularGridInterpolator bounds
ny, nx = self.data.shape[1:]
invalid = (xi < 0) | (xi > nx - 1) | (yi < 0) | (yi > ny - 1)
evaluated_model[invalid] = self.fill_value
return evaluated_model
[docs]
class STDPSFGrid(ModelGridPlotMixin):
"""
Class to read and plot "STDPSF" format ePSF model grids.
STDPSF files are FITS files that contain a 3D array of ePSFs with
the header detailing where the fiducial ePSFs are located in the
detector coordinate frame.
The oversampling factor for STDPSF FITS files is assumed to be 4.
Parameters
----------
filename : str
The name of the STDPDF FITS file. A URL can also be used.
Examples
--------
>>> from photutils.psf import STDPSFGrid
>>> psfgrid = STDPSFGrid('STDPSF_ACSWFC_F814W.fits')
>>> fig = psfgrid.plot_grid()
>>> fig.show()
"""
def __init__(self, filename):
grid_data = _read_stdpsf(filename)
self.data = grid_data['data']
self._xgrid = grid_data['xgrid']
self._ygrid = grid_data['ygrid']
xy_grid = [yx[::-1] for yx in itertools.product(self._ygrid,
self._xgrid)]
oversampling = 4 # assumption for STDPSF files
self.grid_xypos = xy_grid
self.oversampling = as_pair('oversampling', oversampling,
lower_bound=(0, 1))
meta = {'grid_shape': (len(self._ygrid), len(self._xgrid)),
'grid_xypos': xy_grid,
'oversampling': oversampling}
# try to get additional metadata from the filename because this
# information is not currently available in the FITS headers
file_meta = _get_metadata(filename, None)
if file_meta is not None:
meta.update(file_meta)
self.meta = meta
def __str__(self):
cls_name = f'<{self.__class__.__module__}.{self.__class__.__name__}>'
cls_info = []
keys = ('STDPSF', 'detector', 'filter', 'grid_shape')
for key in keys:
if key in self.meta:
name = key.capitalize() if key != 'STDPSF' else key
cls_info.append((name, self.meta[key]))
cls_info.extend([('Number of PSFs', len(self.grid_xypos)),
('PSF shape (oversampled pixels)',
self.data.shape[1:]),
('Oversampling', self.oversampling)])
with np.printoptions(threshold=25, edgeitems=5):
fmt = [f'{key}: {val}' for key, val in cls_info]
return f'{cls_name}\n' + '\n'.join(fmt)
def __repr__(self):
return self.__str__()
with registry.delay_doc_updates(GriddedPSFModel):
registry.register_reader('stdpsf', GriddedPSFModel, stdpsf_reader)
registry.register_identifier('stdpsf', GriddedPSFModel, is_stdpsf)
registry.register_reader('webbpsf', GriddedPSFModel, webbpsf_reader)
registry.register_identifier('webbpsf', GriddedPSFModel, is_webbpsf)