Source code for photutils.psf.utils

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
This module provides utilities for PSF-fitting photometry.
"""

import re

import numpy as np
from astropy.modeling import CompoundModel
from astropy.modeling.models import Const2D, Identity, Shift
from astropy.nddata import NDData
from astropy.nddata.utils import add_array, extract_array
from astropy.table import QTable
from astropy.utils.decorators import deprecated

__all__ = ['make_psf_model', 'prepare_psf_model', 'get_grouped_psf_model',
           'subtract_psf', 'grid_from_epsfs']


class _InverseShift(Shift):
    """
    A model that is the inverse of the normal
    `astropy.modeling.functional_models.Shift` model.
    """

    @staticmethod
    def evaluate(x, offset):
        return x - offset

    @staticmethod
    def fit_deriv(x, *params):
        """
        One dimensional Shift model derivative with respect to parameter.
        """
        d_offset = -np.ones_like(x)
        return [d_offset]


def _interpolate_missing_data(data, mask, method='cubic'):
    """
    Interpolate missing data as identified by the ``mask`` keyword.

    Parameters
    ----------
    data : 2D `~numpy.ndarray`
        An array containing the 2D image.

    mask : 2D bool `~numpy.ndarray`
        A 2D boolean mask array with the same shape as the input
        ``data``, where a `True` value indicates the corresponding
        element of ``data`` is masked.  The masked data points are
        those that will be interpolated.

    method : {'cubic', 'nearest'}, optional
        The method of used to interpolate the missing data:

        * ``'cubic'``:  Masked data are interpolated using 2D cubic
            splines.  This is the default.

        * ``'nearest'``:  Masked data are interpolated using
            nearest-neighbor interpolation.

    Returns
    -------
    data_interp : 2D `~numpy.ndarray`
        The interpolated 2D image.
    """
    from scipy import interpolate

    data_interp = np.array(data, copy=True)

    if len(data_interp.shape) != 2:
        raise ValueError("'data' must be a 2D array.")

    if mask.shape != data.shape:
        raise ValueError("'mask' and 'data' must have the same shape.")

    y, x = np.indices(data_interp.shape)
    xy = np.dstack((x[~mask].ravel(), y[~mask].ravel()))[0]
    z = data_interp[~mask].ravel()

    if method == 'nearest':
        interpol = interpolate.NearestNDInterpolator(xy, z)
    elif method == 'cubic':
        interpol = interpolate.CloughTocher2DInterpolator(xy, z)
    else:
        raise ValueError('Unsupported interpolation method.')

    xy_missing = np.dstack((x[mask].ravel(), y[mask].ravel()))[0]
    data_interp[mask] = interpol(xy_missing)

    return data_interp


def _integrate_model(model, x_name=None, y_name=None, dx=50, dy=50,
                     subsample=100, use_dblquad=False):
    """
    Integrate a model over a 2D grid.

    By default, the model is discretized on a grid of size ``dx``
    x ``dy`` from the model center with a subsampling factor of
    ``subsample``. The model is then integrated over the grid using
    trapezoidal integration.

    If the ``use_dblquad`` keyword is set to `True`, then the model is
    integrated using `scipy.integrate.dblquad`. This is *much* slower
    than the default integration of the evaluated model, but it is more
    accurate. Also, note that the ``dblquad`` integration can sometimes
    fail, e.g., return zero for a non-zero model. This can happen when
    the model function is sharply localized relative to the size of the
    integration interval.

    Parameters
    ----------
    model : `~astropy.modeling.Fittable2DModel`
        The Astropy 2D model.

    x_name : str or `None`, optional
        The name of the ``model`` parameter that corresponds to the
        x-axis center of the PSF. This parameter is required if
        ``use_dblquad`` is `False` and ignored if ``use_dblquad`` is
        `True`.

    y_name : str or `None`, optional
        The name of the ``model`` parameter that corresponds to the
        y-axis center of the PSF. This parameter is required if
        ``use_dblquad`` is `False` and ignored if ``use_dblquad`` is
        `True`.

    dx, dy : odd int, optional
        The size of the integration grid in x and y. Must be odd.
        These keywords are ignored if ``use_dblquad`` is `True`.

    subsample : int, optional
        The subsampling factor for the integration grid along each axis.
        Each pixel will be sampled ``subsample`` x ``subsample`` times.
        This keyword is ignored if ``use_dblquad`` is `True`.

    use_dblquad : bool, optional
        If `True`, then use `scipy.integrate.dblquad` to integrate the
        model. This is *much* slower than the default integration of
        the evaluated model, but it is more accurate.

    Returns
    -------
    integral : float
        The integral of the model over the 2D grid.
    """
    if use_dblquad:
        from scipy.integrate import dblquad

        return dblquad(model, -np.inf, np.inf, -np.inf, np.inf)[0]

    from scipy.integrate import trapezoid

    if dx <= 0 or dy <= 0:
        raise ValueError('dx and dy must be > 0')
    if subsample < 1:
        raise ValueError('subsample must be >= 1')

    xc = getattr(model, x_name)
    yc = getattr(model, y_name)

    if np.any(~np.isfinite((xc.value, yc.value))):
        raise ValueError('model x and y positions must be finite')

    hx = (dx - 1) / 2
    hy = (dy - 1) / 2
    nxpts = int(dx * subsample)
    nypts = int(dy * subsample)
    xvals = np.linspace(xc - hx, xc + hx, nxpts)
    yvals = np.linspace(yc - hy, yc + hy, nypts)

    # evaluate the model on the subsampled grid
    data = model(xvals.reshape(-1, 1), yvals.reshape(1, -1))

    # now integrate over the subsampled grid (first over x, then over y)
    int_func = trapezoid

    return int_func([int_func(row, xvals) for row in data], yvals)


def _shift_model_param(model, param_name, shift=2):
    if isinstance(model, CompoundModel):
        # for CompoundModel, add "shift" to the parameter suffix
        out = re.search(r'(.*)_([\d]*)$', param_name)
        new_name = out.groups()[0] + '_' + str(int(out.groups()[1]) + 2)
    else:
        # simply add the shift to the parameter name
        new_name = param_name + '_' + str(shift)

    return new_name


[docs] def make_psf_model(model, *, x_name=None, y_name=None, flux_name=None, normalize=True, dx=50, dy=50, subsample=100, use_dblquad=False): """ Make a PSF model that can be used with the PSF photometry classes (`PSFPhotometry` or `IterativePSFPhotometry`) from an Astropy fittable 2D model. If the ``x_name``, ``y_name``, or ``flux_name`` keywords are input, this function will map the those ``model`` parameter names to ``x_0``, ``y_0``, or ``flux``, respectively. If any of these keywords are `None`, then a new parameter will be added to the model corresponding to the missing parameter. The new position parameters will be set to 0, and the new flux parameter will be set to 1. The output PSF model will have ``x_name``, ``y_name``, and ``flux_name`` attributes that contain the name of the corresponding model parameter. .. note:: This function is needed only in cases where the 2D PSF model does not have ``x_0``, ``y_0``, and ``flux`` parameters. It is *not* needed for any of the PSF models provided by Photutils (e.g., `~photutils.psf.GriddedPSFModel`, `~photutils.psf.IntegratedGaussianPRF`). Parameters ---------- model : `~astropy.modeling.Fittable2DModel` An Astropy fittable 2D model to use as a PSF. x_name : `str` or `None`, optional The name of the ``model`` parameter that corresponds to the x center of the PSF. If `None`, the model will be assumed to be centered at x=0, and a new model parameter called ``xpos_0`` will be added for the x position. y_name : `str` or `None`, optional The name of the ``model`` parameter that corresponds to the y center of the PSF. If `None`, the model will be assumed to be centered at y=0, and a new parameter called ``ypos_1`` will be added for the y position. flux_name : `str` or `None`, optional The name of the ``model`` parameter that corresponds to the total flux of a source. If `None`, a new model parameter called ``flux_3`` will be added for model flux. normalize : bool, optional If `True`, the input ``model`` will be integrated and rescaled so that its sum integrates to 1. This normalization occurs only once for the input ``model``. If the total flux of ``model`` somehow depends on (x, y) position, then one will need to correct the fitted model fluxes for this effect. dx, dy : odd int, optional The size of the integration grid in x and y for normalization. Must be odd. These keywords are ignored if ``normalize`` is `False` or ``use_dblquad`` is `True`. subsample : int, optional The subsampling factor for the integration grid along each axis for normalization. Each pixel will be sampled ``subsample`` x ``subsample`` times. This keyword is ignored if ``normalize`` is `False` or ``use_dblquad`` is `True`. use_dblquad : bool, optional If `True`, then use `scipy.integrate.dblquad` to integrate the model for normalization. This is *much* slower than the default integration of the evaluated model, but it is more accurate. This keyword is ignored if ``normalize`` is `False`. Returns ------- result : `~astropy.modeling.CompoundModel` A PSF model that can be used with the PSF photometry classes. The returned model will always be an Astropy compound model. Notes ----- To normalize the model, by default it is discretized on a grid of size ``dx`` x ``dy`` from the model center with a subsampling factor of ``subsample``. The model is then integrated over the grid using trapezoidal integration. If the ``use_dblquad`` keyword is set to `True`, then the model is integrated using `scipy.integrate.dblquad`. This is *much* slower than the default integration of the evaluated model, but it is more accurate. Also, note that the ``dblquad`` integration can sometimes fail, e.g., return zero for a non-zero model. This can happen when the model function is sharply localized relative to the size of the integration interval. """ input_model = model.copy() if x_name is None: x_model = _InverseShift(0, name='x_position') # "offset" is the _InverseShift parameter name; # the x inverse shift model is always the first submodel x_name = 'offset_0' else: if x_name not in input_model.param_names: raise ValueError(f'{x_name!r} parameter name not found in the ' 'input model.') x_model = Identity(1) x_name = _shift_model_param(input_model, x_name, shift=2) if y_name is None: y_model = _InverseShift(0, name='y_position') # "offset" is the _InverseShift parameter name; # the y inverse shift model is always the second submodel y_name = 'offset_1' else: if y_name not in input_model.param_names: raise ValueError(f'{y_name!r} parameter name not found in the ' 'input model.') y_model = Identity(1) y_name = _shift_model_param(input_model, y_name, shift=2) x_model.fittable = True y_model.fittable = True psf_model = (x_model & y_model) | input_model if flux_name is None: psf_model *= Const2D(1.0, name='flux') # "amplitude" is the Const2D parameter name; # the flux scaling is always the last component flux_name = psf_model.param_names[-1] else: flux_name = _shift_model_param(input_model, flux_name, shift=2) if normalize: integral = _integrate_model(psf_model, x_name=x_name, y_name=y_name, dx=dx, dy=dy, subsample=subsample, use_dblquad=use_dblquad) if integral == 0: raise ValueError('Cannot normalize the model because the ' 'integrated flux is zero.') psf_model *= Const2D(1.0 / integral, name='normalization_scaling') # fix all the output model parameters that are not x, y, or flux for name in psf_model.param_names: psf_model.fixed[name] = name not in (x_name, y_name, flux_name) # final check that the x, y, and flux parameter names are in the # output model names = (x_name, y_name, flux_name) for name in names: if name not in psf_model.param_names: raise ValueError(f'{name!r} parameter name not found in the ' 'output model.') # set the parameter names for the PSF photometry classes psf_model.x_name = x_name psf_model.y_name = y_name psf_model.flux_name = flux_name # set aliases psf_model.x_0 = getattr(psf_model, x_name) psf_model.y_0 = getattr(psf_model, y_name) psf_model.flux = getattr(psf_model, flux_name) return psf_model
[docs] @deprecated('1.9.0', alternative='make_psf_model') def prepare_psf_model(psfmodel, *, xname=None, yname=None, fluxname=None, renormalize_psf=True): """ Convert a 2D PSF model to one suitable for use with `BasicPSFPhotometry` or its subclasses. .. note:: This function is needed only in special cases where the PSF model does not have ``x_0``, ``y_0``, and ``flux`` model parameters. In particular, it is not needed for any of the PSF models provided by photutils (e.g., `~photutils.psf.EPSFModel`, `~photutils.psf.IntegratedGaussianPRF`, `~photutils.psf.FittableImageModel`, `~photutils.psf.GriddedPSFModel`, etc). Parameters ---------- psfmodel : `~astropy.modeling.Fittable2DModel` The model to assume as representative of the PSF. xname : `str` or `None`, optional The name of the ``psfmodel`` parameter that corresponds to the x-axis center of the PSF. If `None`, the model will be assumed to be centered at x=0, and a new parameter will be added for the offset. yname : `str` or `None`, optional The name of the ``psfmodel`` parameter that corresponds to the y-axis center of the PSF. If `None`, the model will be assumed to be centered at y=0, and a new parameter will be added for the offset. fluxname : `str` or `None`, optional The name of the ``psfmodel`` parameter that corresponds to the total flux of the star. If `None`, a scaling factor will be added to the model. renormalize_psf : bool, optional If `True`, the model will be integrated from -inf to inf and rescaled so that the total integrates to 1. Note that this renormalization only occurs *once*, so if the total flux of ``psfmodel`` depends on position, this will *not* be correct. Returns ------- result : `~astropy.modeling.Fittable2DModel` A new model ready to be passed into `BasicPSFPhotometry` or its subclasses. """ if xname is None: xinmod = _InverseShift(0, name='x_offset') xname = 'offset_0' else: xinmod = Identity(1) xname = xname + '_2' xinmod.fittable = True if yname is None: yinmod = _InverseShift(0, name='y_offset') yname = 'offset_1' else: yinmod = Identity(1) yname = yname + '_2' yinmod.fittable = True outmod = (xinmod & yinmod) | psfmodel.copy() if fluxname is None: outmod = outmod * Const2D(1, name='flux_scaling') fluxname = 'amplitude_3' else: fluxname = fluxname + '_2' if renormalize_psf: # we do the import here because other machinery works w/o scipy from scipy import integrate integrand = integrate.dblquad(psfmodel, -np.inf, np.inf, lambda x: -np.inf, lambda x: np.inf)[0] normmod = Const2D(1.0 / integrand, name='renormalize_scaling') outmod = outmod * normmod # final setup of the output model - fix all the non-offset/scale # parameters for pnm in outmod.param_names: outmod.fixed[pnm] = pnm not in (xname, yname, fluxname) # and set the names so that BasicPSFPhotometry knows what to do outmod.xname = xname outmod.yname = yname outmod.fluxname = fluxname # now some convenience aliases if reasonable outmod.psfmodel = outmod[2] if 'x_0' not in outmod.param_names and 'y_0' not in outmod.param_names: outmod.x_0 = getattr(outmod, xname) outmod.y_0 = getattr(outmod, yname) if 'flux' not in outmod.param_names: outmod.flux = getattr(outmod, fluxname) return outmod
[docs] @deprecated('1.9.0') def get_grouped_psf_model(template_psf_model, star_group, pars_to_set): """ Construct a joint PSF model which consists of a sum of PSF's templated on a specific model, but whose parameters are given by a table of objects. Parameters ---------- template_psf_model : `astropy.modeling.Fittable2DModel` instance The model to use for *individual* objects. Must have parameters named ``x_0``, ``y_0``, and ``flux``. star_group : `~astropy.table.Table` Table of stars for which the compound PSF will be constructed. It must have columns named ``x_0``, ``y_0``, and ``flux_0``. pars_to_set : `dict` A dictionary of parameter names and values to set. Returns ------- group_psf An `astropy.modeling` ``CompoundModel`` instance which is a sum of the given PSF models. """ group_psf = None for index, star in enumerate(star_group): psf_to_add = template_psf_model.copy() # we 'tag' the model here so that later we don't have to rely # on possibly mangled names of the compound model to find # the parameters again psf_to_add.name = index for param_tab_name, param_name in pars_to_set.items(): setattr(psf_to_add, param_name, star[param_tab_name]) if group_psf is None: # this is the first one only group_psf = psf_to_add else: group_psf = group_psf + psf_to_add return group_psf
@deprecated('1.9.0') def _extract_psf_fitting_names(psf): """ Determine the names of the x coordinate, y coordinate, and flux from a model. Returns (xname, yname, fluxname). """ if hasattr(psf, 'xname'): xname = psf.xname elif 'x_0' in psf.param_names: xname = 'x_0' else: raise ValueError('Could not determine x coordinate name for ' 'psf_photometry.') if hasattr(psf, 'yname'): yname = psf.yname elif 'y_0' in psf.param_names: yname = 'y_0' else: raise ValueError('Could not determine y coordinate name for ' 'psf_photometry.') if hasattr(psf, 'fluxname'): fluxname = psf.fluxname elif 'flux' in psf.param_names: fluxname = 'flux' else: raise ValueError('Could not determine flux name for psf_photometry.') return xname, yname, fluxname
[docs] @deprecated('1.9.0') def subtract_psf(data, psf, posflux, *, subshape=None): """ Subtract PSF/PRFs from an image. Parameters ---------- data : `~astropy.nddata.NDData` or array (must be 2D) Image data. psf : `astropy.modeling.Fittable2DModel` instance PSF/PRF model to be subtracted from the data. posflux : Array-like of shape (3, N) or `~astropy.table.Table` Positions and fluxes for the objects to subtract. If an array, it is interpreted as ``(x, y, flux)`` If a table, the columns 'x_fit', 'y_fit', and 'flux_fit' must be present. subshape : length-2 or None The shape of the region around the center of the location to subtract the PSF from. If None, subtract from the whole image. Returns ------- subdata : same shape and type as ``data`` The image with the PSF subtracted. """ if data.ndim != 2: raise ValueError(f'{data.ndim}-d array not supported. Only 2-d ' 'arrays can be passed to subtract_psf.') # translate array input into table if hasattr(posflux, 'colnames'): if 'x_fit' not in posflux.colnames: raise ValueError('Input table does not have x_fit') if 'y_fit' not in posflux.colnames: raise ValueError('Input table does not have y_fit') if 'flux_fit' not in posflux.colnames: raise ValueError('Input table does not have flux_fit') else: posflux = QTable(names=['x_fit', 'y_fit', 'flux_fit'], data=posflux) # Set up constants across the loop psf = psf.copy() xname, yname, fluxname = _extract_psf_fitting_names(psf) indices = np.indices(data.shape) subbeddata = data.copy() if subshape is None: indicies_reversed = indices[::-1] for row in posflux: getattr(psf, xname).value = row['x_fit'] getattr(psf, yname).value = row['y_fit'] getattr(psf, fluxname).value = row['flux_fit'] subbeddata -= psf(*indicies_reversed) else: for row in posflux: x_0, y_0 = row['x_fit'], row['y_fit'] # float dtype needed for fill_value=np.nan y = extract_array(indices[0].astype(float), subshape, (y_0, x_0)) x = extract_array(indices[1].astype(float), subshape, (y_0, x_0)) getattr(psf, xname).value = x_0 getattr(psf, yname).value = y_0 getattr(psf, fluxname).value = row['flux_fit'] subbeddata = add_array(subbeddata, -psf(x, y), (y_0, x_0)) return subbeddata
[docs] def grid_from_epsfs(epsfs, grid_xypos=None, meta=None): """ Create a GriddedPSFModel from a list of EPSFModels. Given a list of EPSFModels, this function will return a GriddedPSFModel. The fiducial points for each input EPSFModel can either be set on each individual model by setting the 'x_0' and 'y_0' attributes, or provided as a list of tuples (``grid_xypos``). If a ``grid_xypos`` list is provided, it must match the length of input EPSFs. In either case, the fiducial points must be on a grid. Optionally, a ``meta`` dictionary may be provided for the output GriddedPSFModel. If this dictionary contains the keys 'grid_xypos', 'oversampling', or 'fill_value', they will be overridden. Note: If set on the input EPSFModel (x_0, y_0), then ``origin`` must be the same for each input EPSF. Additionally data units and dimensions must be for each input EPSF, and values for ``flux`` and ``oversampling``, and ``fill_value`` must match as well. Parameters ---------- epsfs : list of `photutils.psf.models.EPSFModel` A list of EPSFModels representing the individual PSFs. grid_xypos : list, optional A list of fiducial points (x_0, y_0) for each PSF. If not provided, the x_0 and y_0 of each input EPSF will be considered the fiducial point for that PSF. Default is None. meta : dict, optional Additional metadata for the GriddedPSFModel. Note that, if they exist in the supplied ``meta``, any values under the keys ``grid_xypos`` , ``oversampling``, or ``fill_value`` will be overridden. Default is None. Returns ------- GriddedPSFModel: `photutils.psf.GriddedPSFModel` The gridded PSF model created from the input EPSFs. """ # prevent circular imports from photutils.psf import EPSFModel, GriddedPSFModel # optional, to store fiducial from input if `grid_xypos` is None x_0s = [] y_0s = [] data_arrs = [] oversampling = None fill_value = None dat_unit = None origin = None flux = None # make sure, if provided, that ``grid_xypos`` is the same length as # ``epsfs`` if grid_xypos is not None: if len(grid_xypos) != len(epsfs): raise ValueError('``grid_xypos`` must be the same length as ' '``epsfs``.') # loop over input once for i, epsf in enumerate(epsfs): # check input type if not isinstance(epsf, EPSFModel): raise ValueError('All input `epsfs` must be of type ' '`photutils.psf.models.EPSFModel`.') # get data array from EPSF data_arrs.append(epsf.data) if i == 0: oversampling = epsf.oversampling # same for fill value and flux, grid will have a single value # so it should be the same for all input, and error if not. fill_value = epsf.fill_value # check that origins are the same if grid_xypos is None: origin = epsf.origin flux = epsf.flux # if there's a unit, those should also all be the same try: dat_unit = epsf.data.unit except AttributeError: pass # just keep as None else: if np.any(epsf.oversampling != oversampling): raise ValueError('All input EPSFModels must have the same ' 'value for ``oversampling``.') if epsf.fill_value != fill_value: raise ValueError('All input EPSFModels must have the same ' 'value for ``fill_value``.') if epsf.data.ndim != data_arrs[0].ndim: raise ValueError('All input EPSFModels must have data with ' 'the same dimensions.') try: unitt = epsf.data_unit if unitt != dat_unit: raise ValueError('All input data must have the same unit.') except AttributeError as exc: if dat_unit is not None: raise ValueError('All input data must have the same ' 'unit.') from exc if epsf.flux != flux: raise ValueError('All input EPSFModels must have the same ' 'value for ``flux``.') if grid_xypos is None: # get gridxy_pos from x_0, y_0 if not provided x_0s.append(epsf.x_0.value) y_0s.append(epsf.y_0.value) # also check that origin is the same, if using x_0s and y_0s # from input if epsf.origin != origin: raise ValueError('If using ``x_0``, ``y_0`` as fiducial point,' '``origin`` must match for each input EPSF.') # if not supplied, use from x_0, y_0 of input EPSFs as fiducuals # these are checked when GriddedPSFModel is created to make sure they # are actually on a grid. if grid_xypos is None: grid_xypos = list(zip(x_0s, y_0s)) data_cube = np.stack(data_arrs, axis=0) if meta is None: meta = {} # add required keywords to meta meta['grid_xypos'] = grid_xypos meta['oversampling'] = oversampling meta['fill_value'] = fill_value data = NDData(data_cube, meta=meta) grid = GriddedPSFModel(data, fill_value=fill_value) return grid