Source code for photutils.psf.utils

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Tools for PSF-fitting photometry.
"""

import warnings

import numpy as np
from astropy.modeling import Model
from astropy.table import QTable
from astropy.units import Quantity
from astropy.utils.exceptions import AstropyUserWarning
from scipy import interpolate

from photutils.centroids import centroid_com
from photutils.psf.functional_models import CircularGaussianPRF
from photutils.utils import CutoutImage
from photutils.utils._parameters import as_pair

__all__ = ['fit_2dgaussian', 'fit_fwhm']


def _make_mask(image, mask):
    """
    Create a mask for the input image.

    Non-finite values (e.g., NaN or inf) in the ``image`` array are
    automatically masked. If a mask is provided, then the non-finite
    values are combined with the provided mask.

    Parameters
    ----------
    image : 2D `~numpy.ndarray`
        The input image.

    mask : 2D bool `~numpy.array` or None
        A boolean mask with the same shape as ``data``, where a `True`
        value indicates the corresponding element of ``data`` is masked.

    Returns
    -------
    mask : 2D bool `~numpy.ndarray` or `None`
        The mask for the input image. A `True` value indicates the
        corresponding element of ``image`` is masked.
    """
    def warn_nonfinite():
        msg = ('Input data contains unmasked non-finite values '
               '(NaN or inf), which were automatically ignored.')
        warnings.warn(msg, AstropyUserWarning)

    # if NaNs are in the data, no actual fitting takes place
    # https://github.com/astropy/astropy/pull/12811
    finite_mask = ~np.isfinite(image)

    if mask is not None:
        finite_mask |= mask
        if np.any(finite_mask & ~mask):
            warn_nonfinite()
    else:
        mask = finite_mask
        if np.any(finite_mask):
            warn_nonfinite()
        else:
            mask = None

    return mask


[docs] def fit_2dgaussian(data, *, xypos=None, fwhm=None, fix_fwhm=True, fit_shape=None, mask=None, error=None): """ Fit a 2D Gaussian model to one or more sources in an image. This convenience function uses a `~photutils.psf.CircularGaussianPRF` model to fit the sources using the `~photutils.psf.PSFPhotometry` class. Non-finite values (e.g., NaN or inf) in the ``data`` array are automatically masked. Parameters ---------- data : 2D array The 2D array of the image. The input array must be background subtracted. xypos : array-like, optional The initial (x, y) pixel coordinates of the sources as a list of tuples or a 2D array. If `None`, then one source will be fit with an initial position using the center-of-mass centroid of the ``data`` array. fwhm : float, optional The initial guess for the FWHM of the Gaussian PSF model. If `None`, then the initial guess is half the mean of the x and y sizes of the ``fit_shape`` values. fix_fwhm : bool, optional Whether to fix the FWHM of the Gaussian PSF model during the fitting process. fit_shape : int or tuple of two ints, optional The shape of the fitting region. If a scalar, then it is assumed to be a square. If `None`, then the shape of the input ``data`` will be used. mask : array-like (bool), optional A boolean mask with the same shape as the input ``data``, where a `True` value indicates the corresponding element of ``data`` is masked. error : 2D array, optional The pixel-wise Gaussian 1-sigma errors of the input ``data``. ``error`` is assumed to include *all* sources of error, including the Poisson error of the sources (see `~photutils.utils.calc_total_error`). ``error`` must have the same shape as the input ``data``. If a `~astropy.units.Quantity` array, then ``data`` must also be a `~astropy.units.Quantity` array with the same units. Returns ------- result : `~photutils.psf.PSFPhotometry` The PSF-fitting photometry results. See Also -------- fit_fwhm : Fit the FWHM of one or more sources in an image. Notes ----- The source(s) are fit with a `~photutils.psf.CircularGaussianPRF` model using the `~photutils.psf.PSFPhotometry` class. The initial guess for the flux is the sum of the pixel values within the fitting region. If ``fwhm`` is `None`, then the initial guess for the FWHM is half the mean of the x and y sizes of the ``fit_shape`` values. Examples -------- Fit a 2D Gaussian model to an image containing only one source (e.g., a cutout image): >>> import numpy as np >>> from photutils.psf import CircularGaussianPRF, fit_2dgaussian >>> yy, xx = np.mgrid[:51, :51] >>> model = CircularGaussianPRF(x_0=22.17, y_0=28.87, fwhm=3.123, flux=9.7) >>> data = model(xx, yy) >>> fit = fit_2dgaussian(data, fix_fwhm=False, fit_shape=7) >>> phot_tbl = fit.results # doctest: +FLOAT_CMP >>> cols = ['x_fit', 'y_fit', 'fwhm_fit', 'flux_fit'] >>> for col in cols: ... phot_tbl[col].info.format = '.4f' # optional format >>> print(phot_tbl[['id'] + cols]) id x_fit y_fit fwhm_fit flux_fit --- ------- ------- -------- -------- 1 22.1700 28.8700 3.1230 9.7000 Fit a 2D Gaussian model to multiple sources in an image: >>> import numpy as np >>> from photutils.detection import DAOStarFinder >>> from photutils.psf import (CircularGaussianPRF, fit_2dgaussian, ... make_psf_model_image) >>> model = CircularGaussianPRF() >>> data, sources = make_psf_model_image((100, 100), model, 5, ... min_separation=25, ... model_shape=(15, 15), ... flux=(100, 200), fwhm=[3, 8]) >>> finder = DAOStarFinder(0.1, 5) >>> finder_tbl = finder(data) >>> xypos = zip(sources['x_0'], sources['y_0']) >>> psfphot = fit_2dgaussian(data, xypos=xypos, fit_shape=7, ... fix_fwhm=False) >>> phot_tbl = psfphot.results >>> len(phot_tbl) 5 Here we show only a few columns of the photometry table: >>> cols = ['x_fit', 'y_fit', 'fwhm_fit', 'flux_fit'] >>> for col in cols: ... phot_tbl[col].info.format = '.4f' # optional format >>> print(phot_tbl[['id'] + cols]) id x_fit y_fit fwhm_fit flux_fit --- ------- ------- -------- -------- 1 61.7787 74.6905 5.6947 147.9988 2 30.2017 27.5858 5.2138 123.2373 3 10.5237 82.3776 7.6551 180.1881 4 8.4214 12.0369 3.2026 192.3530 5 76.9412 35.9061 6.6600 126.6130 """ # prevent circular import from photutils.psf.photometry import PSFPhotometry # mask non-finite values mask = _make_mask(data, mask) if xypos is None: xypos = centroid_com(data, mask=mask) if isinstance(xypos, zip): xypos = np.array(list(xypos)) xypos = np.atleast_2d(xypos) if fit_shape is None: if len(xypos) > 1: msg = ('fit_shape is required when fitting multiple sources. If ' 'fit_shape is not provided, then the fit may not converge ' 'or may give poor results. For multiple sources, you ' 'should provide a fit_shape value that is appropriate for ' 'the size of the sources in your image.') raise ValueError(msg) fit_shape = data.shape # Ensure odd shape required by the PSF photometry fitting # Here we trim the even edges by 1 pixel fit_shape = tuple(s - 1 if s % 2 == 0 else s for s in fit_shape) msg = ('fit_shape is None, so the input data array is assumed to be ' 'a cutout image containing only one source. If your input ' 'data is not a cutout image, then the fit may not converge ' 'or may give poor results. For non-cutout input data, you ' 'should provide a fit_shape value that is appropriate for ' 'the size of the sources in your image. ' f'Using fit_shape={fit_shape}.') warnings.warn(msg, AstropyUserWarning) else: fit_shape = as_pair('fit_shape', fit_shape, lower_bound=(1, 1), check_odd=True) flux_init = [] for yxpos in xypos[:, ::-1]: cutout = CutoutImage(data, yxpos, tuple(fit_shape)) cutout = cutout.data[np.isfinite(cutout.data)] flux_init.append(np.nansum(cutout)) if isinstance(data, Quantity): flux_init <<= data.unit init_params = QTable() init_params['x'] = xypos[:, 0] init_params['y'] = xypos[:, 1] init_params['flux'] = flux_init if fwhm is None: fwhm = np.mean(fit_shape) / 2.0 init_params['fwhm'] = fwhm model = CircularGaussianPRF(fwhm=fwhm) model.fwhm.min = 0.0 if not fix_fwhm: model.fwhm.fixed = False phot = PSFPhotometry(model, fit_shape) _ = phot(data, mask=mask, error=error, init_params=init_params) return phot
[docs] def fit_fwhm(data, *, xypos=None, fwhm=None, fit_shape=None, mask=None, error=None): """ Fit the FWHM of one or more sources in an image. This convenience function uses a `~photutils.psf.CircularGaussianPRF` model to fit the sources using the `~photutils.psf.PSFPhotometry` class. Non-finite values (e.g., NaN or inf) in the ``data`` array are automatically masked. Parameters ---------- data : 2D array The 2D array of the image. The input array must be background subtracted. xypos : array-like, optional The initial (x, y) pixel coordinates of the sources as a list of tuples or a 2D array. If `None`, then one source will be fit with an initial position using the center-of-mass centroid of the ``data`` array. fwhm : float, optional The initial guess for the FWHM of the Gaussian PSF model. If `None`, then the initial guess is half the mean of the x and y sizes of the ``fit_shape`` values. fit_shape : int or tuple of two ints, optional The shape of the fitting region. If a scalar, then it is assumed to be a square. If `None`, then the shape of the input ``data`` will be used. mask : array-like (bool), optional A boolean mask with the same shape as the input ``data``, where a `True` value indicates the corresponding element of ``data`` is masked. error : 2D array, optional The pixel-wise Gaussian 1-sigma errors of the input ``data``. ``error`` is assumed to include *all* sources of error, including the Poisson error of the sources (see `~photutils.utils.calc_total_error`). ``error`` must have the same shape as the input ``data``. If a `~astropy.units.Quantity` array, then ``data`` must also be a `~astropy.units.Quantity` array with the same units. Returns ------- fwhm : `~numpy.ndarray` The FWHM of the sources. Note that the returned FWHM values are always positive. See Also -------- fit_2dgaussian : Fit a 2D Gaussian model to one or more sources in an image. Notes ----- The source(s) are fit using the :func:`fit_2dgaussian` function, which uses a `~photutils.psf.CircularGaussianPRF` model with the `~photutils.psf.PSFPhotometry` class. The initial guess for the flux is the sum of the pixel values within the fitting region. If ``fwhm`` is `None`, then the initial guess for the FWHM is half the mean of the x and y sizes of the ``fit_shape`` values. Examples -------- Fit the FWHM of a single source (e.g., a cutout image): >>> import numpy as np >>> from photutils.psf import CircularGaussianPRF, fit_fwhm >>> yy, xx = np.mgrid[:51, :51] >>> model = CircularGaussianPRF(x_0=22.17, y_0=28.87, fwhm=3.123, flux=9.7) >>> data = model(xx, yy) >>> fwhm = fit_fwhm(data, fit_shape=7) >>> fwhm # doctest: +FLOAT_CMP array([3.123]) Fit the FWHMs of multiple sources in an image: >>> import numpy as np >>> from photutils.detection import DAOStarFinder >>> from photutils.psf import (CircularGaussianPRF, fit_fwhm, ... make_psf_model_image) >>> model = CircularGaussianPRF() >>> data, sources = make_psf_model_image((100, 100), model, 5, ... min_separation=25, ... model_shape=(15, 15), ... flux=(100, 200), fwhm=[3, 8]) >>> finder = DAOStarFinder(0.1, 5) >>> finder_tbl = finder(data) >>> xypos = zip(sources['x_0'], sources['y_0']) >>> fwhms = fit_fwhm(data, xypos=xypos, fit_shape=7) >>> fwhms # doctest: +FLOAT_CMP array([5.69467204, 5.21376414, 7.65508658, 3.20255356, 6.66003098]) """ with warnings.catch_warnings(record=True) as fit_warnings: phot = fit_2dgaussian(data, xypos=xypos, fwhm=fwhm, fix_fwhm=False, fit_shape=fit_shape, mask=mask, error=error) # Re-emit fit_shape warnings and check for other warnings fit_shape_warning = False other_warnings = False for warning in fit_warnings: if 'fit_shape is None' in str(warning.message): warnings.warn(str(warning.message), warning.category) fit_shape_warning = True else: other_warnings = True if other_warnings and not fit_shape_warning: msg = ('One or more fit(s) may not have converged. Please ' 'carefully check your results. You may need to change ' 'the input "xypos" and "fit_shape" parameters.') warnings.warn(msg, AstropyUserWarning) return np.array(phot.results['fwhm_fit'])
def _interpolate_missing_data(data, mask, *, method='cubic'): """ Interpolate missing data as identified by the ``mask`` keyword. Parameters ---------- data : 2D `~numpy.ndarray` An array containing the 2D image. mask : 2D bool `~numpy.ndarray` A 2D boolean mask array with the same shape as the input ``data``, where a `True` value indicates the corresponding element of ``data`` is masked. The masked data points are those that will be interpolated. method : {'cubic', 'nearest'}, optional The method of used to interpolate the missing data: * ``'cubic'``: Masked data are interpolated using 2D cubic splines. If any masked pixels cannot be interpolated using cubic interpolation (e.g., at the edges), they will be filled using nearest-neighbor interpolation as a fallback. * ``'nearest'``: Masked data are interpolated using nearest-neighbor interpolation. Returns ------- data_interp : 2D `~numpy.ndarray` The interpolated 2D image. All masked pixels are guaranteed to be filled if there are any valid (unmasked) pixels. If all pixels are masked, the returned array will contain NaN values. """ data_interp = np.copy(data) if len(data_interp.shape) != 2: msg = 'data must be a 2D array' raise ValueError(msg) if mask.shape != data.shape: msg = 'mask and data must have the same shape' raise ValueError(msg) if not np.any(mask): return data_interp # Check if all pixels are masked - cannot interpolate if np.all(mask): data_interp[:] = np.nan return data_interp # initialize the interpolator y, x = np.indices(data_interp.shape) xy = np.dstack((x[~mask].ravel(), y[~mask].ravel()))[0] z = data_interp[~mask].ravel() # interpolate the missing data if method == 'nearest': interpol = interpolate.NearestNDInterpolator(xy, z) elif method == 'cubic': interpol = interpolate.CloughTocher2DInterpolator(xy, z) else: msg = 'Unsupported interpolation method' raise ValueError(msg) xy_missing = np.dstack((x[mask].ravel(), y[mask].ravel()))[0] data_interp[mask] = interpol(xy_missing) # For cubic interpolation, some edge pixels may not be interpolated # (NaN values). Use nearest-neighbor interpolation as a fallback. if method == 'cubic': remaining_mask = ~np.isfinite(data_interp) if np.any(remaining_mask): xy_valid = np.dstack((x[~remaining_mask].ravel(), y[~remaining_mask].ravel()))[0] z_valid = data_interp[~remaining_mask].ravel() interpol_nn = interpolate.NearestNDInterpolator(xy_valid, z_valid) xy_remaining = np.dstack((x[remaining_mask].ravel(), y[remaining_mask].ravel()))[0] data_interp[remaining_mask] = interpol_nn(xy_remaining) return data_interp def _validate_psf_model(psf_model): """ Validate the PSF model. The PSF model must be a subclass of `astropy.modeling.Fittable2DModel`. It must also be two-dimensional and have a single output. Parameters ---------- psf_model : `astropy.modeling.Fittable2DModel` The PSF model to validate. Returns ------- psf_model : `astropy.modeling.Model` The validated PSF model. Raises ------ TypeError If the PSF model is not an Astropy Model subclass. ValueError If the PSF model is not two-dimensional with n_inputs=2 and n_outputs=1. """ if not isinstance(psf_model, Model): msg = 'psf_model must be an Astropy Model subclass' raise TypeError(msg) if psf_model.n_inputs != 2 or psf_model.n_outputs != 1: msg = ('psf_model must be two-dimensional with ' 'n_inputs=2 and n_outputs=1') raise ValueError(msg) return psf_model def _get_psf_model_main_params(psf_model): """ Get the names of the main PSF model parameters corresponding to x, y, and flux. The PSF model must have parameters called 'x_0', 'y_0', and 'flux' or it must have 'x_name', 'y_name', and 'flux_name' attributes (i.e., output from `make_psf_model`). Otherwise, a `ValueError` is raised. The PSF model must be a subclass of `astropy.modeling.Model`. It must also be two-dimensional and have a single output. Parameters ---------- psf_model : `astropy.modeling.Model` The PSF model to validate. Returns ------- model_params : tuple A tuple of the PSF model parameter names. These are always returned in the order of (x, y, flux). """ psf_model = _validate_psf_model(psf_model) params1 = ('x_0', 'y_0', 'flux') params2 = ('x_name', 'y_name', 'flux_name') if all(name in psf_model.param_names for name in params1): model_params = params1 elif all(params := [getattr(psf_model, name, None) for name in params2]): model_params = tuple(params) else: msg = 'Invalid PSF model - could not find PSF parameter names' raise ValueError(msg) return model_params def _create_call_docstring(*, iterative=False): """ Decorator factory to create the __call__ method docstring for PSF photometry methods. This decorator factory creates a decorator that provides a base docstring for PSF photometry methods and customizes it based on the class type (PSFPhotometry vs IterativePSFPhotometry). Parameters ---------- iterative : bool, optional If True, customize the docstring for IterativePSFPhotometry. If False, customize for PSFPhotometry. Returns ------- decorator : callable A method decorator that updates the method's docstring. """ def decorator(func): """ Method decorator that updates the method's docstring. """ # Import PSF_FLAGS here to avoid circular imports from .flags import PSF_FLAGS base_docstring = """ Perform PSF photometry. Parameters ---------- data : 2D `~numpy.ndarray` The 2D array on which to perform photometry. Invalid data values (i.e., NaN or inf) are automatically masked. mask : 2D bool `~numpy.ndarray`, optional A boolean mask with the same shape as ``data``, where a `True` value indicates the corresponding element of ``data`` is masked. error : 2D `~numpy.ndarray`, optional The pixel-wise 1-sigma errors of the input ``data``. ``error`` is assumed to include *all* sources of error, including the Poisson error of the sources. ``error`` must have the same shape as the input ``data``. If ``data`` is a `~astropy.units.Quantity` array, then ``error`` must also be a `~astropy.units.Quantity` array with the same units. init_params : `~astropy.table.Table` or `None`, optional A table containing the initial guesses of the model parameters (e.g., x, y, flux) for each source{init_params_suffix}. If the initial x and y values are not included, then the ``finder`` keyword must be defined. If the initial flux values are not included, then the ``aperture_radius`` keyword must be defined to measure the initial flux values. Note that the initial flux values refer to the model flux parameters and are not corrected for local background values (computed using ``local_bkg_estimator`` or input in a ``local_bkg`` column). The allowed column names are: * ``x_init``, ``xinit``, ``x``, ``x_0``, ``x0``, ``xcentroid``, ``x_centroid``, ``x_peak``, ``xcen``, ``x_cen``, ``xpos``, ``x_pos``, ``x_fit``, and ``xfit``. * ``y_init``, ``yinit``, ``y``, ``y_0``, ``y0``, ``ycentroid``, ``y_centroid``, ``y_peak``, ``ycen``, ``y_cen``, ``ypos``, ``y_pos``, ``y_fit``, and ``yfit``. * ``flux_init``, ``fluxinit``, ``flux``, ``flux_0``, ``flux0``, ``flux_fit``, ``fluxfit``, ``source_sum``, ``segment_flux``, and ``kron_flux``. * If the PSF model has additional free parameters that are fit, they can be included in the table. The column names must match the parameter names in the PSF model. They can also be suffixed with either the "_init" or "_fit" suffix. The suffix search order is "_init", "" (no suffix), and "_fit". For example, if the PSF model has an additional parameter named "sigma", then the allowed column names are: "sigma_init", "sigma", and "sigma_fit". If the column name is not found in the table, then the default value from the PSF model will be used. The parameter names are searched in the input table in the above order, stopping at the first match. If ``data`` is a `~astropy.units.Quantity` array, then the initial flux values in this table must also must also have compatible units. The table can also have ``group_id`` and ``local_bkg`` columns. If ``group_id`` is input, the values will be used and ``grouper`` keyword will be ignored. If ``local_bkg`` is input, those values will be used and the ``local_bkg_estimator`` will be ignored. If ``data`` has units, then the ``local_bkg`` values must have the same units. Returns ------- table : `~astropy.table.QTable` An astropy table with the PSF-fitting results. The table will contain the following columns: * ``id`` : unique identification number for the source * ``group_id`` : unique identification number for the source group * ``group_size`` : the total number of sources in the group. This number includes sources that are in the group, but were not fit due to being masked, having no overlap with the input data, or having too few pixels for a fit. {iter_detected_column} * ``x_init``, ``x_fit``, ``x_err`` : the initial, fit and error of the source x center * ``y_init``, ``y_fit``, ``y_err`` : the initial, fit, and error of the source y center * ``flux_init``, ``flux_fit``, ``flux_err`` : the initial, fit, and error of the source flux * ``n_pixels_fit`` : the number of unmasked pixels used to fit the source * ``qfit`` : a quality-of-fit metric defined as the sum of the absolute value of the fit residuals divided by the fit flux. ``qfit`` is zero for sources that are perfectly fit by the PSF model. * ``cfit`` : a quality-of-fit metric defined as the fit residual (data - model) in the initial central pixel value divided by the fit flux. NaN values indicate that the central pixel was masked. Large positive values indicate sources that are sharper than the PSF model (e.g., cosmic ray, hot pixel, etc.). Large negative values indicate sources that are broader than the PSF model * ``reduced_chi2`` : the reduced chi-squared statistic. If no ``error`` array is provided, ``reduced_chi2`` values will be NaN. * ``flags`` : bitwise flag values <flag descriptions> Notes ----- The ``qfit`` and ``cfit`` metrics are equivalent to the ``q`` and ``C`` fits metrics defined by the HST PSF photometry `hst1pass <https://www.stsci.edu/files/live/sites/www/files/home/hst/instr umentation/acs/documentation/instrument-science-reports-isrs/_do cuments/isr2202.pdf>`_ software. They are also similar to the ``chi`` (``qfit``) and ``sharp`` (``cfit``) metrics used by `DAOPHOT <https://ui.adsabs.harvard.edu/abs/1987PASP...99..191S/abstract>`_. """ if iterative: # Customizations for IterativePSFPhotometry customized_docstring = base_docstring.format( init_params_suffix=(' *only for\n ' 'the first iteration*'), iter_detected_column=(' * ``iter_detected`` : the ' 'iteration number in which the' '\n source was detected\n'), ) else: # Customizations for PSFPhotometry customized_docstring = base_docstring.format( init_params_suffix='', iter_detected_column='', ) # Apply the flag descriptions replacement placeholder = '<flag descriptions>' if placeholder in customized_docstring: # Generate the flag descriptions flag_descriptions = [] flag_descriptions.append('') flag_descriptions.append(' - 0 : no flags') for flag_def in PSF_FLAGS.FLAG_DEFINITIONS: desc = flag_def.description line = f' - {flag_def.bit_value} : {desc}' flag_descriptions.append(line) # Replace the placeholder with the flag descriptions flag_text = '\n'.join(flag_descriptions) customized_docstring = customized_docstring.replace( placeholder, flag_text) # Update the method's docstring func.__doc__ = customized_docstring return func return decorator