Source code for photutils.psf.photometry

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Tools for performing PSF-fitting photometry.
"""

import contextlib
import inspect
import warnings
from dataclasses import dataclass, field

import astropy.units as u
import numpy as np
from astropy.modeling.fitting import TRFLSQFitter
from astropy.nddata import NDData, StdDevUncertainty
from astropy.table import QTable
from astropy.utils.decorators import deprecated
from astropy.utils.exceptions import AstropyUserWarning

from photutils.background import LocalBackground
from photutils.psf._components import (PSFDataProcessor, PSFFitter,
                                       PSFResultsAssembler,
                                       _make_model_image_docstring,
                                       _make_residual_image_docstring,
                                       _ModelImageMaker)
from photutils.psf.flags import decode_psf_flags
from photutils.psf.utils import (_create_call_docstring,
                                 _get_psf_model_main_params, _make_mask,
                                 _validate_psf_model)
from photutils.utils._deprecation import (deprecated_positional_kwargs,
                                          deprecated_renamed_argument)
from photutils.utils._parameters import as_pair
from photutils.utils._progress_bars import add_progress_bar
from photutils.utils._quantity_helpers import process_quantities
from photutils.utils._repr import make_repr

__all__ = ['PSFPhotometry']


@dataclass
class _PSFParameterMapper:
    """
    Helper class to map PSF model parameter names to table column names.
    """

    psf_model: object
    alias_to_model_param: dict = field(init=False, repr=False)

    # Valid column names that can be used for initial (x, y, flux)
    # positions. Order matters: the first matched name in each tuple will
    # be used.
    VALID_INIT_COLNAMES = {  # noqa: RUF012
        'x': (
            'x_init', 'xinit', 'x', 'x_0', 'x0', 'xcentroid',
            'x_centroid', 'x_peak', 'xcen', 'x_cen', 'xpos', 'x_pos',
            'x_fit', 'xfit',
        ),
        'y': (
            'y_init', 'yinit', 'y', 'y_0', 'y0', 'ycentroid',
            'y_centroid', 'y_peak', 'ycen', 'y_cen', 'ypos', 'y_pos',
            'y_fit', 'yfit',
        ),
        'flux': (
            'flux_init', 'fluxinit', 'flux', 'flux_0', 'flux0',
            'flux_fit', 'fluxfit', 'source_sum', 'segment_flux',
            'kron_flux',
        ),
    }
    MAIN_ALIASES = ('x', 'y', 'flux')

    def __post_init__(self):
        self.alias_to_model_param = self._get_model_params_map()

    def _get_model_params_map(self):
        """
        Get the mapping of aliases ('x', 'y', 'flux', etc.) to the
        actual parameter names in the PSF model.

        Returns
        -------
        params_map : dict
            A dictionary mapping parameter aliases to their actual
            names in the PSF model. The keys are 'x', 'y', 'flux', and
            any additional parameters defined in the model.
        """
        # the order of the main parameters is important; it defines
        # the order of table outputs
        main_params = _get_psf_model_main_params(self.psf_model)
        params_map = dict(zip(self.MAIN_ALIASES, main_params, strict=True))

        # extra parameters that are not 'x', 'y', or 'flux', but
        # are free to be fit (fixed = False), are added to the map
        # with their own aliases
        fitted_params = [
            param for param in self.psf_model.param_names
            if not self.psf_model.fixed[param]
        ]
        extra_params = [param for param in fitted_params
                        if param not in main_params]

        params_map.update({key: key for key in extra_params})
        return params_map

    @property
    def fitted_param_names(self):
        """
        Get list of model parameter names that will be fitted.
        """
        return [param for param in self.psf_model.param_names
                if not self.psf_model.fixed[param]]

    def get_init_colname(self, alias):
        """
        Get initialization column name for parameter alias.
        """
        return f'{alias}_init'

    def get_fit_colname(self, alias):
        """
        Get fitted parameter column name for parameter alias.
        """
        return f'{alias}_fit'

    def get_err_colname(self, alias):
        """
        Get error column name for parameter alias.
        """
        return f'{alias}_err'

    @property
    def init_colnames(self):
        """
        Dictionary mapping aliases to initialization column names.
        """
        return {alias: self.get_init_colname(alias)
                for alias in self.alias_to_model_param}

    @property
    def fit_colnames(self):
        """
        Dictionary mapping aliases to fitted parameter column names.
        """
        return {alias: self.get_fit_colname(alias)
                for alias in self.alias_to_model_param}

    @property
    def err_colnames(self):
        """
        Dictionary mapping aliases to error column names.
        """
        return {alias: self.get_err_colname(alias)
                for alias in self.alias_to_model_param}

    @property
    def model_param_to_alias(self):
        """
        Dictionary mapping model parameter names to aliases.
        """
        return {v: k for k, v in self.alias_to_model_param.items()}

    def find_column(self, table, param_alias):
        """
        Find the first valid column name in a table for a given
        parameter alias.

        Parameters
        ----------
        table : `~astropy.table.Table`
            The input table to search for the column.

        param_alias : str
            The alias for the parameter (e.g., 'x', 'y', 'flux').

        Returns
        -------
        result : str or `None`
            The first valid column name found in the table for the
            parameter alias, or `None` if no valid column is found.
        """
        try:
            valid_names = self.VALID_INIT_COLNAMES[param_alias]
        except KeyError:
            # valid names for extra parameters are more limited
            valid_names = (f'{param_alias}_init', param_alias,
                           f'{param_alias}_fit')

        for name in valid_names:
            if name in table.colnames:
                return name

        return None

    def rename_table_columns(self, table):
        """
        Rename columns in-place in an input table to the ``_init``
        format.

        Parameters
        ----------
        table : `~astropy.table.Table`
            The input table with columns to be renamed.

        Returns
        -------
        table : `~astropy.table.Table`
            The input table with columns renamed to the `_init` format
            based on the parameter aliases.
        """
        for param_alias in self.alias_to_model_param:
            found_col = self.find_column(table, param_alias)
            if found_col:
                target_col = self.init_colnames[param_alias]
                if found_col != target_col:
                    table.rename_column(found_col, target_col)
        return table


[docs] class PSFPhotometry: """ Class to perform PSF photometry. This class implements a flexible PSF photometry algorithm that can find sources in an image, group overlapping sources, fit the PSF model to the sources, and subtract the fit PSF models from the image. Parameters ---------- psf_model : 2D `astropy.modeling.Model` The PSF model to fit to the data. The model must have parameters named ``x_0``, ``y_0``, and ``flux``, corresponding to the center (x, y) position and flux, or it must have 'x_name', 'y_name', and 'flux_name' attributes that map to the x, y, and flux parameters. The model must be two-dimensional such that it accepts 2 inputs (e.g., x and y) and provides 1 output. fit_shape : int or length-2 array_like The rectangular shape around the initial source position that will be used to define the PSF-fitting data. If ``fit_shape`` is a scalar then a square shape of size ``fit_shape`` will be used. If ``fit_shape`` has two elements, they must be in ``(ny, nx)`` order. Each element of ``fit_shape`` must be an odd number greater than or equal to 3. In general, ``fit_shape`` should be set to a small size (e.g., ``(5, 5)``) that covers the region with the highest flux signal-to-noise. finder : callable or `~photutils.detection.StarFinderBase` or `None`, \ optional A callable used to identify sources in an image. The ``finder`` must accept a 2D image as input and return a `~astropy.table.Table` containing the x and y centroid positions. These positions are used as the starting points for the PSF fitting. The allowed ``x`` column names are (same suffix for ``y``): ``'x_init'``, ``'xinit'``, ``'x'``, ``'x_0'``, ``'x0'``, ``'xcentroid'``, ``'x_centroid'``, ``'x_peak'``, ``'xcen'``, ``'x_cen'``, ``'xpos'``, ``'x_pos'``, ``'x_fit'``, and ``'xfit'``. If `None`, then the initial (x, y) model positions must be input using the ``init_params`` keyword when calling the class. The (x, y) values in ``init_params`` override this keyword. If this class is run on an image that has units (i.e., a `~astropy.units.Quantity` array), then certain ``finder`` keywords (e.g., ``threshold``) must have the same units. Please see the documentation for the specific ``finder`` class for more information. grouper : `~photutils.psf.SourceGrouper` or callable or `None`, optional A callable used to group sources. Typically, grouped sources are those that overlap with their neighbors. Sources that are grouped are fit simultaneously. The ``grouper`` must accept the x and y coordinates of the sources and return an integer array of the group ID numbers (starting from 1) indicating the group in which a given source belongs. If `None`, then no grouping is performed, i.e. each source is fit independently. The ``group_id`` values in ``init_params`` override this keyword. A warning is raised if any group size is larger than ``group_warning_threshold`` sources. fitter : `~astropy.modeling.fitting.Fitter`, optional The fitter object used to perform the fit of the model to the data. If `None`, then the default `astropy.modeling.fitting.TRFLSQFitter` is used. fitter_maxiters : int, optional The maximum number of iterations in which the ``fitter`` is called for each source. The value can be increased if the fit is not converging for sources. This parameter is passed to the ``fitter`` if it supports the ``maxiter`` parameter and ignored otherwise. xy_bounds : `None`, float, or 2-tuple of float, optional The maximum distance in pixels that a fitted source can be from the initial (x, y) position. If a single float, then the same maximum distance is used for both x and y. If a 2-tuple of floats, then the distances are in ``(x, y)`` order. If `None`, then no bounds are applied. Either value can also be `None` to indicate no bound along that axis. aperture_radius : float or `None`, optional The radius of the circular aperture used to estimate the initial flux of each source. If `None`, then the initial flux values must be provided in the ``init_params`` table. The aperture radius must be a strictly positive scalar. If initial flux values are present in the ``init_params`` table, they will override this keyword. local_bkg_estimator : `~photutils.background.LocalBackground` or `None`, \ optional The object used to estimate the local background around each source. If `None`, then no local background is subtracted. The ``local_bkg`` values in ``init_params`` override this keyword. This option should be used with care, especially in crowded fields where the ``fit_shape`` of sources overlap (see Notes below). group_warning_threshold : int, optional The maximum number of sources in a group before a warning is raised. If the number of sources in a group exceeds this value, a warning is raised to inform the user that fitting such large groups may take a long time and be error-prone. The default is 25 sources. progress_bar : bool, optional Whether to display a progress bar when fitting the sources (or groups). The progress bar requires that the `tqdm <https://tqdm.github.io/>`_ optional dependency be installed. Notes ----- The data that will be fit for each source is defined by the ``fit_shape`` parameter. A cutout will be made around the initial center of each source with a shape defined by ``fit_shape``. The PSF model will be fit to the data in this region. The cutout region that is fit does not shift if the source center shifts during the fit iterations. Therefore, the initial source positions should be close to the true source positions. One way to ensure this is to use a ``finder`` to identify sources in the data. If the fitted positions are significantly different from the initial positions, one can rerun the `PSFPhotometry` class using the fit results as the input ``init_params``, which will change the fitted cutout region for each source. After running `PSFPhotometry`, you can use the `results_to_init_params` method to generate a table of initial parameters that can be used in a subsequent call to `PSFPhotometry`. This table will contain the fitted (x, y) positions, fluxes, and any other model parameters that were fit. If the fitted model parameters are NaN, then the source was not valid, likely due to not enough valid data pixels in the ``fit_shape`` region. The ``flags`` column in the output ``results`` table indicates the reason why a source was not valid. If the fitted model parameter errors are NaN, then either the fit did not converge, the model parameter was fixed, or the input ``fitter`` did not return parameter errors. For the later case, one can try a different Astropy fitter that returns parameter errors. The local background value around each source is optionally estimated using the ``local_bkg_estimator`` or obtained from the ``local_bkg`` column in the input ``init_params`` table. This local background is then subtracted from the data over the ``fit_shape`` region for each source before fitting the PSF model. For sources where their ``fit_shape`` regions overlap, the local background will effectively be subtracted twice in the overlapping ``fit_shape`` regions, even if the source ``grouper`` is input. This is not an issue if the sources are well-separated. However, for crowded fields, please use the ``local_bkg_estimator`` (or ``local_bkg`` column in ``init_params``) with care. Care should be taken in defining the source groups. Simultaneously fitting very large source groups is computationally expensive and error-prone. Internally, source grouping requires the creation of a compound Astropy model. Due to the way compound Astropy models are currently constructed, large groups also require excessively large amounts of memory; this will hopefully be fixed in a future Astropy version. A warning will be raised if the number of sources in a group exceeds the ``group_warning_threshold`` value. """ # Default value for parameter initialization (invalid sources) _DEFAULT_PARAM_VALUE = np.nan @deprecated_renamed_argument('localbkg_estimator', 'local_bkg_estimator', '3.0', until='4.0') def __init__(self, psf_model, fit_shape, *, finder=None, grouper=None, fitter=None, fitter_maxiters=100, xy_bounds=None, aperture_radius=None, local_bkg_estimator=None, group_warning_threshold=25, progress_bar=False): self.psf_model = _validate_psf_model(psf_model) self._param_mapper = _PSFParameterMapper(self.psf_model) self.fit_shape = as_pair('fit_shape', fit_shape, lower_bound=(1, 1), check_odd=True) self.finder = self._validate_callable(finder, 'finder') self.grouper = self._validate_callable(grouper, 'grouper') if fitter is None: fitter = TRFLSQFitter() self.fitter = self._validate_callable(fitter, 'fitter') self.fitter_maxiters = self._validate_maxiters(fitter_maxiters) self.xy_bounds = self._validate_bounds(xy_bounds) self.aperture_radius = self._validate_radius(aperture_radius) self.local_bkg_estimator = self._validate_localbkg( local_bkg_estimator, 'local_bkg_estimator') self.group_warning_threshold = group_warning_threshold self.progress_bar = progress_bar self._data_processor = PSFDataProcessor( self._param_mapper, self.fit_shape, finder=self.finder, aperture_radius=self.aperture_radius, local_bkg_estimator=self.local_bkg_estimator, ) self._psf_fitter = PSFFitter( self.psf_model, self._param_mapper, fitter=self.fitter, fitter_maxiters=self.fitter_maxiters, xy_bounds=self.xy_bounds, group_warning_threshold=self.group_warning_threshold, ) self._results_assembler = PSFResultsAssembler( self._param_mapper, self.fit_shape, xy_bounds=self.xy_bounds, ) # used by the __repr__ method and the output table metadata self._attrs = ('psf_model', 'fit_shape', 'finder', 'grouper', 'fitter', 'fitter_maxiters', 'xy_bounds', 'aperture_radius', 'local_bkg_estimator', 'group_warning_threshold', 'progress_bar') self._reset_results() def _reset_results(self): """ Reset internal state attributes for each __call__. """ self.data_unit = None self.finder_results = None self.init_params = None self.results = None self.fit_info = [] # sync data_unit with components if hasattr(self, '_data_processor'): self._data_processor.data_unit = None # internal state container self._state = { 'valid_mask_by_id': None, 'fit_param_errs': None, 'fit_error_indices': None, 'fitted_models_table': None, 'n_pixels_fit': None, 'group_size': None, 'invalid_reasons': None, 'sum_abs_residuals': None, 'cen_residuals': None, 'reduced_chi2': None, } def _initialize_source_state_storage(self, n_sources): """ Initialize the per-source arrays used to store the fit results in the state container. Parameters ---------- n_sources : int The number of sources to initialize the arrays for. """ nfitparam = len(self._param_mapper.fitted_param_names) self._state.update({ 'fit_param_errs': np.full((n_sources, nfitparam), np.nan), 'n_pixels_fit': np.zeros(n_sources, dtype=int), 'invalid_reasons': [''] * n_sources, 'sum_abs_residuals': np.full(n_sources, np.nan, dtype=float), 'cen_residuals': np.full(n_sources, np.nan, dtype=float), 'reduced_chi2': np.full(n_sources, np.nan, dtype=float), 'group_size': np.ones(n_sources, dtype=int), 'valid_mask_by_id': np.full(n_sources, fill_value=False, dtype=bool), }) # Initialize model parameter storage directly self._init_model_param_storage(n_sources) self.fit_info = [{} for _ in range(n_sources)] def _init_model_param_storage(self, n_sources): """ Initialize storage for model parameters directly in state. This avoids storing the full model objects and instead stores only the parameter values, fixed flags, and bounds that are needed for the results table. """ # get all parameter names from the PSF model model_params = list(self.psf_model.param_names) # initialize parameter value storage param_data = {} for model_param in model_params: # initialize all parameters with np.nan (for invalid sources) param_data[model_param] = np.full(n_sources, self._DEFAULT_PARAM_VALUE) param_data[f'{model_param}_fixed'] = [None] * n_sources param_data[f'{model_param}_bounds'] = [None] * n_sources # add placehold IDs column -- this will be updated later to # match IDs in init_params param_data['id'] = np.arange(1, n_sources + 1) self._state['model_param_data'] = param_data def _cache_fitted_parameters(self, row_index, model): """ Extract and store model parameters directly instead of storing the full model object. This method updates the internal state container with model parameter values, fixed flags, and bounds for a specific source. Parameters ---------- row_index : int The index of the source in the results arrays. model : astropy.modeling.Model or None The fitted model for this source, or None for invalid sources. """ param_data = self._state['model_param_data'] if model is None: # For invalid sources, use default template from psf_model template_model = self.psf_model for param_name in template_model.param_names: # Set all parameters to np.nan for invalid sources param_data[param_name][row_index] = self._DEFAULT_PARAM_VALUE template_param = getattr(template_model, param_name) param_data[f'{param_name}_fixed'][row_index] = ( template_param.fixed) param_data[f'{param_name}_bounds'][row_index] = ( template_param.bounds) else: # For valid sources, extract actual fitted values for param_name in model.param_names: param = getattr(model, param_name) param_data[param_name][row_index] = param.value param_data[f'{param_name}_fixed'][row_index] = param.fixed param_data[f'{param_name}_bounds'][row_index] = param.bounds def _build_fitted_models_table(self): """ Build the fitted models table from stored parameter data. Returns ------- table : `~astropy.table.QTable` The table of all model parameters for each source. """ param_data = self._state['model_param_data'] flux_param = self._param_mapper.alias_to_model_param['flux'] # Apply data unit to flux parameter if needed if self.data_unit is not None: param_data[flux_param] = param_data[flux_param] * self.data_unit # Create table from parameter data table = QTable(param_data) # Set id column to match init_params for clean merging if hasattr(self, 'init_params') and self.init_params is not None: ids = self.init_params['id'] table['id'] = ids return table def __repr__(self): return make_repr(self, self._attrs) @staticmethod def _validate_type(obj, name, expected_type): """ Validate that object is of expected type. Parameters ---------- obj : object or None Object to validate. name : str Name of the parameter for error messages. expected_type : type or tuple of types Expected type(s) for the object. Returns ------- obj : object or None The validated object. Raises ------ error_type If obj is not None and not an instance of expected_type. """ if obj is not None and not isinstance(obj, expected_type): type_name = expected_type.__name__ msg = f'{name} must be a {type_name} instance' raise TypeError(msg) return obj @staticmethod def _validate_callable(obj, name): """ Validate that the input object is callable. Parameters ---------- obj : object or None Object to validate. name : str Name of the parameter for error messages. Returns ------- obj : object or None The validated callable object. Raises ------ TypeError If obj is not None and not callable. """ if obj is not None and not callable(obj): msg = f'{name!r} must be a callable object' raise TypeError(msg) return obj def _validate_bounds(self, xy_bounds): """ Validate the input ``xy_bounds`` value. Parameters ---------- xy_bounds : float, tuple of float, or None The maximum distance(s) in pixels that fitted sources can be from initial positions. Returns ------- xy_bounds : ndarray or None The validated xy_bounds as a 2-element array, or None if input was None. Raises ------ ValueError If xy_bounds has incorrect shape, dimension, or contains invalid values (non-positive or non-finite). """ if xy_bounds is None: return xy_bounds xy_bounds = np.atleast_1d(xy_bounds) if len(xy_bounds) == 1: xy_bounds = np.array((xy_bounds[0], xy_bounds[0])) if len(xy_bounds) != 2: msg = 'xy_bounds must have 1 or 2 elements' raise ValueError(msg) if xy_bounds.ndim != 1: msg = 'xy_bounds must be a 1D array' raise ValueError(msg) for bound in xy_bounds: if bound is not None: if bound <= 0: msg = 'xy_bounds must be strictly positive' raise ValueError(msg) if not np.isfinite(bound): msg = 'xy_bounds must be finite' raise ValueError(msg) return xy_bounds @staticmethod def _validate_radius(radius): """ Validate the input ``aperture_radius`` value. Parameters ---------- radius : float or None The aperture radius value to validate. Returns ------- radius : float or None The validated aperture radius. Raises ------ ValueError If radius is not None and is not a strictly positive finite scalar. """ if radius is not None and (not np.isscalar(radius) or radius <= 0 or not np.isfinite(radius)): msg = 'aperture_radius must be a strictly-positive scalar' raise ValueError(msg) return radius def _validate_localbkg(self, value, name): """ Validate the input ``local_bkg_estimator`` value. Parameters ---------- value : LocalBackground or None The local background estimator to validate. name : str Name of the parameter for error messages. Returns ------- value : LocalBackground or None The validated local background estimator. Raises ------ TypeError If value is not None and not a LocalBackground instance. """ value = self._validate_type(value, 'local_bkg_estimator', LocalBackground) return self._validate_callable(value, name) def _validate_maxiters(self, maxiters): """ Validate the input ``maxiters`` value. Parameters ---------- maxiters : int or None Maximum number of fitter iterations to validate. Returns ------- maxiters : int or None The validated maxiters value, or None if the fitter doesn't support this parameter. """ spec = inspect.signature(self.fitter.__call__) if 'maxiter' not in spec.parameters: msg = ("'maxiters' will be ignored because it is not accepted " 'by the input fitter __call__ method.') warnings.warn(msg, AstropyUserWarning) maxiters = None return maxiters def _sync_data_unit(self): """ Synchronize data_unit between main class and components. This method ensures that the data_unit attribute is consistent between the PSFPhotometry instance and its internal component objects (e.g., _data_processor). This method modifies the internal state in-place. """ if hasattr(self, '_data_processor'): self._data_processor.data_unit = self.data_unit def _find_sources_if_needed(self, data, mask, init_params): """ Find sources using the finder if initial positions are not provided. This method delegates to the data processor component and syncs results. """ result = self._data_processor.find_sources_if_needed( data, mask, init_params) self.finder_results = self._data_processor.finder_results return result def _group_sources(self, init_params): """ Group sources using the grouper or the user-provided 'group_id' column. Parameters ---------- init_params : `~astropy.table.Table` The table of initial parameters. Returns ------- init_params : `~astropy.table.Table` The table of initial parameters with a 'group_id' column. """ if 'group_id' in init_params.colnames: # user-provided group_id takes precedence self.grouper = None elif self.grouper is not None: # use the grouper to group sources x_col = self._param_mapper.init_colnames['x'] y_col = self._param_mapper.init_colnames['y'] init_params['group_id'] = self.grouper(init_params[x_col], init_params[y_col]) else: # no grouper provided, so each source is its own group init_params['group_id'] = init_params['id'].copy() # ensure group_id contains only positive (> 0) integers group_id = init_params['group_id'] if np.any(~np.isfinite(group_id)): msg = 'group_id must be finite' raise ValueError(msg) if not np.issubdtype(group_id.dtype, np.integer): msg = 'group_id must be an integer array' raise TypeError(msg) if np.any(group_id <= 0): msg = 'group_id must contain only positive (> 0) integers' raise ValueError(msg) return init_params def _build_initial_parameters(self, data, mask, init_params): """ Build the table of initial parameters for fitting. This method orchestrates finding sources, estimating initial fluxes and backgrounds, and grouping sources. Parameters ---------- data : 2D `numpy.ndarray` The input image data. mask : 2D `numpy.ndarray` or `None` A boolean mask where `True` values are masked. init_params : `~astropy.table.Table` or `None` The input table of initial parameters. Returns ------- init_params : `~astropy.table.Table` or `None` The table of initial parameters ready for fitting, or `None` if no sources were found. """ init_params = self._find_sources_if_needed(data, mask, init_params) if init_params is None: return None # strip any units from the x/y position columns for axis in ('x', 'y'): colname = self._param_mapper.init_colnames[axis] if isinstance(init_params[colname], u.Quantity): init_params[colname] = init_params[colname].value add_flux_bkg = self._data_processor.estimate_flux_and_bkg_if_needed init_params = add_flux_bkg(data, mask, init_params) init_params = self._group_sources(init_params) # check for large group sizes after grouping is complete warn_size = self.group_warning_threshold if 'group_id' in init_params.colnames: _, counts = np.unique(init_params['group_id'], return_counts=True) if len(counts) > 0 and max(counts) > warn_size: msg = (f'Some groups have more than {warn_size} ' 'sources. Fitting such groups may take a long time ' 'and be error-prone. You may want to consider using ' 'different `SourceGrouper` parameters or changing ' 'the "group_id" column in "init_params".') warnings.warn(msg, AstropyUserWarning) # Add columns for any additional model parameters that are # fit using the model's default value, if not already present. for alias, col_name in self._param_mapper.init_colnames.items(): if col_name not in init_params.colnames: alias_map = self._param_mapper.alias_to_model_param model_param_name = alias_map[alias] init_params[col_name] = getattr(self.psf_model, model_param_name) # Define the final column order of the init_params table. # Extra aliases are those that are not in the main_aliases. # The alias and model_param names are the same for # extra parameters, so we can use the alias_to_model_param map # to get the extra aliases. main_aliases = self._param_mapper.MAIN_ALIASES extra_aliases = [param for param in self._param_mapper.alias_to_model_param if param not in main_aliases] main_cols = [self._param_mapper.init_colnames[alias] for alias in main_aliases] extra_cols = [self._param_mapper.init_colnames[alias] for alias in extra_aliases] col_order = ['id', 'group_id', 'local_bkg', *main_cols, *extra_cols] return init_params[col_order] def _prepare_fit_inputs(self, data, *, mask=None, error=None, init_params=None): """ Prepare all inputs for the PSF fitting. This method handles data validation, unit processing, source finding, initial parameter estimation, and grouping. It returns the processed inputs ready for the `_fit_sources` method. Parameters ---------- data : 2D array_like The input image data. mask : 2D array_like or `None`, optional A boolean mask where `True` values are masked (ignored). error : 2D array_like or `None`, optional The 1-sigma uncertainties of the input data. init_params : `~astropy.table.Table` or `None`, optional The input table of initial parameters. Returns ------- data : 2D `numpy.ndarray` The validated input image data. mask : 2D `numpy.ndarray` or `None` The validated boolean mask where `True` values are masked. If no mask was input, then `None` is returned. error : 2D `numpy.ndarray` or `None` The validated 1-sigma uncertainties of the input data. If no error was input, then `None` is returned. init_params : `~astropy.table.Table` or `None` The table of initial parameters ready for fitting, or `None` if no sources were found. """ (data, error), unit = process_quantities((data, error), ('data', 'error')) self.data_unit = unit self._sync_data_unit() # Sync with components data = self._data_processor.validate_array(data, 'data') error = self._data_processor.validate_array(error, 'error', data_shape=data.shape) mask = self._data_processor.validate_array(mask, 'mask', data_shape=data.shape) mask = _make_mask(data, mask) init_params = self._data_processor.validate_init_params(init_params) init_params = self._build_initial_parameters(data, mask, init_params) if init_params is None: # no sources found return None, None, None, None return data, mask, error, init_params def _ungroup_fit_results(self, row_indices, valid_mask, group_model, group_fit_info): """ Ungroup fitted results and store per-source data. This method extracts individual source parameters, errors, and covariance information directly from the group fit results and stores them in the state container. This avoids storing large group (flat) model objects and covariance matrices. The results for each valid source in the group are stored directly in the state container, including the fitted model parameters, parameter errors, and fit_info dictionary. ``row_indices`` is used to ensure that the order of the sources in the state container matches the source ID order in the input ``init_params`` table. Parameters ---------- row_indices : list The row indices for sources in this group. valid_mask : ndarray Boolean mask indicating which sources in the group are valid. group_model : `astropy.modeling.Model` The fitted model for a single group. This can be a compound model. group_fit_info : dict The fit_info dictionary corresponding to the group fit. """ nfitparam = len(self._param_mapper.fitted_param_names) num_valid = int(np.count_nonzero(valid_mask)) # Extract parameter errors from the group covariance matrix param_cov = group_fit_info.get('param_cov') if param_cov is None: source_param_errs = np.full((num_valid, nfitparam), np.nan) source_covs = [None] * num_valid else: param_err_1d = np.sqrt(np.diag(param_cov)) # For grouped (flat) models, parameters are arranged as, # e.g., [flux_0, x_0_0, y_0_0, fwhm_0, flux_1, x_0_1, ...] source_param_errs = param_err_1d.reshape(num_valid, nfitparam) # Extract individual covariance matrices for each source source_covs = self._psf_fitter.extract_source_covariances( param_cov, num_valid, nfitparam) # Split models and extract parameters if num_valid == 1: source_models = [group_model] else: # For grouped (flat) models, create individual models from # params source_models = self._psf_fitter.split_flat_model(group_model, num_valid) # Store results for each valid source valid_idx = 0 for i, row_index in enumerate(row_indices): if not valid_mask[i]: continue model = source_models[valid_idx] param_errs = source_param_errs[valid_idx] source_cov = source_covs[valid_idx] # Extract and store model parameters self._cache_fitted_parameters(row_index, model) self._state['fit_param_errs'][row_index] = param_errs # Create individual fit_info with source-specific covariance source_fit_info = dict(group_fit_info) if source_cov is not None: source_fit_info['param_cov'] = source_cov self.fit_info[row_index] = source_fit_info self._state['valid_mask_by_id'][row_index] = True valid_idx += 1 def _calculate_residual_metrics(self, row_indices, valid_mask, npixfit_full, cen_index_full, *, error=None, xi_all=None, yi_all=None): """ Calculate residual-based fit metrics for valid sources. Parameters ---------- row_indices : array-like Source row indices. valid_mask : array-like Boolean mask for valid sources. npixfit_full : array-like Number of pixels used in fit for each source. cen_index_full : array-like Center pixel indices for each source. error : 2D array or None, optional The 1-sigma uncertainties of the input data. Used for calculating reduced chi-squared. xi_all : list or None, optional List of x-coordinates for each valid source's cutout pixels. yi_all : list or None, optional List of y-coordinates for each valid source's cutout pixels. Returns ------- sum_abs_residuals : ndarray Sum of absolute residuals for each source, or np.nan for invalid sources. cen_residuals : ndarray Center residuals for each source, or np.nan for invalid sources. reduced_chi2 : ndarray Reduced chi-squared values for each source, or np.nan for invalid sources. """ # Extract residuals from fit_info residual_key = None with contextlib.suppress(AttributeError): fit_info = self.fitter.fit_info if isinstance(fit_info, dict): if 'fun' in fit_info: residual_key = 'fun' if 'fvec' in fit_info: # LevMarLSQFitter residual_key = 'fvec' if residual_key is not None: residuals = self.fitter.fit_info[residual_key] else: residuals = None n_sources = len(row_indices) sum_abs_residuals = np.full(n_sources, np.nan, dtype=float) cen_residuals = np.full(n_sources, np.nan, dtype=float) reduced_chi2 = np.full(n_sources, np.nan, dtype=float) if residuals is not None: # convert to numpy arrays for vectorized operations valid_mask_arr = np.array(valid_mask, dtype=bool) npixfit_arr = np.array(npixfit_full) cen_index_arr = np.array(cen_index_full) # get valid source indices valid_indices = np.where(valid_mask_arr)[0] if len(valid_indices) > 0: npix_valid = npixfit_arr[valid_indices] # calculate cumulative pixel positions cumsum_npix = np.concatenate(([0], np.cumsum(npix_valid))) # get the number of fitted parameters nfitparam = len(self._param_mapper.fitted_param_names) # process all valid sources for idx, valid_idx in enumerate(valid_indices): start_pos = cumsum_npix[idx] end_pos = cumsum_npix[idx + 1] source_residuals = residuals[start_pos:end_pos] # For qfit and cfit calculations, we need raw residuals # (data - model), not weighted residuals # (data - model)/error. If errors were provided, convert # weighted residuals back to raw residuals. raw_residuals = source_residuals if (error is not None and xi_all is not None and yi_all is not None): # Extract error values for this source's pixels xi_source = xi_all[idx] yi_source = yi_all[idx] error_vals = error[yi_source, xi_source] # Convert weighted residuals to raw residuals: # multiply by error if (np.all(error_vals > 0) and np.all(np.isfinite(error_vals))): raw_residuals = source_residuals * error_vals # sum of absolute residuals sum_abs_residuals[valid_idx] = float( np.abs(raw_residuals).sum()) # center residual cen_idx = cen_index_arr[valid_idx] if np.isfinite(cen_idx): cen_residuals[valid_idx] = float( -raw_residuals[int(cen_idx)]) else: cen_residuals[valid_idx] = np.nan # Calculate chi-squared. The residuals have already # been weighted by (1 / error). If errors are not # input, then reduced_chi2 will be NaN. dof = float(npix_valid[idx] - nfitparam) if (error is not None and xi_all is not None and yi_all is not None): # Extract error values for this source's pixels xi_source = xi_all[idx] yi_source = yi_all[idx] error_vals = error[yi_source, xi_source] if (np.all(error_vals > 0) and np.all(np.isfinite(error_vals))): chi2 = np.sum(source_residuals**2) reduced_chi2[valid_idx] = chi2 / dof row_indices_arr = np.array(row_indices) self._state['sum_abs_residuals'][row_indices_arr] = sum_abs_residuals self._state['cen_residuals'][row_indices_arr] = cen_residuals self._state['reduced_chi2'][row_indices_arr] = reduced_chi2 return sum_abs_residuals, cen_residuals, reduced_chi2 def _fit_source_groups(self, source_groups, data, mask, error): """ Fit PSF models to groups of sources in the input data. This method processes each group of sources, fits PSF models, and stores the results. Individual source results are extracted and stored as soon as each group is fitted. Parameters ---------- source_groups : iterable Groups of sources to fit, where each group contains sources that should be fit simultaneously. data : 2D ndarray The input image data. mask : 2D ndarray or None Boolean mask for the input data. error : 2D ndarray or None The 1-sigma uncertainties of the input data. """ if self.progress_bar: # pragma: no cover source_groups = add_progress_bar(source_groups, desc='Fit source/group') y_offsets, x_offsets = self._data_processor.get_fit_offsets() nfitparam_per_source = len(self._param_mapper.fitted_param_names) # sources are fit by groups in group ID order for source_group in source_groups: group_size = len(source_group) xi_all = [] yi_all = [] cutout_all = [] npixfit_full = [] cen_index_full = [] valid_mask_list = [] invalid_reasons = [] row_indices = [] # Process all sources with pre-filtering optimization for row in source_group: # Always use pre-filtering for all group sizes should_skip_source = self._data_processor.should_skip_source should_skip, reason = should_skip_source(row, data.shape) if should_skip: res = { 'valid': False, 'reason': reason, 'xx': None, 'yy': None, 'cutout': None, 'npix': 0, 'cen_index': np.nan, } else: res = self._data_processor.get_source_cutout_data( row, data, mask, y_offsets, x_offsets) # Common processing for all sources npixfit_full.append(res['npix']) cen_index_full.append(res['cen_index']) invalid_reasons.append(res['reason']) row_indices.append(row['_row_index']) if res['valid'] and res['npix'] >= nfitparam_per_source: valid_mask_list.append(True) xi_all.append(res['xx']) yi_all.append(res['yy']) cutout_all.append(res['cutout']) else: if res['valid'] and res['npix'] < nfitparam_per_source: invalid_reasons[-1] = 'too_few_pixels' valid_mask_list.append(False) valid_mask = np.array(valid_mask_list, dtype=bool) num_valid = int(np.count_nonzero(valid_mask)) # Store basic info for all sources in group. # row_indices is used to store results in the original # source ID order given by init_params. row_indices_arr = np.array(row_indices) self._state['group_size'][row_indices_arr] = group_size self._state['n_pixels_fit'][row_indices_arr] = np.array( npixfit_full, dtype=int) for i, row_index in enumerate(row_indices): reason = invalid_reasons[i] self._state['invalid_reasons'][row_index] = ( '' if reason is None else reason ) if num_valid == 0: # Handle all-invalid group for row_index in row_indices: self._state['valid_mask_by_id'][row_index] = False self._cache_fitted_parameters(row_index, None) continue # Fit the group xi_concat = np.concatenate(xi_all) yi_concat = np.concatenate(yi_all) cutout_concat = np.concatenate(cutout_all) valid_sources = source_group[valid_mask] psf_model = self._psf_fitter.make_psf_model(valid_sources) fit_model, fit_info = self._psf_fitter.run_fitter( psf_model, xi_concat, yi_concat, cutout_concat, error) # Ungroup and store per-source results. row_indices is used # to ensure that results are stored in the original source # ID order given by init_params. self._ungroup_fit_results(row_indices, valid_mask, fit_model, fit_info) # Calculate residual metrics for valid sources self._calculate_residual_metrics( row_indices, valid_mask, npixfit_full, cen_index_full, error=error, xi_all=xi_all, yi_all=yi_all) def _get_fit_error_indices(self): """ Get the indices of fits that did not converge. This method delegates to the results assembler component. """ return self._results_assembler.get_fit_error_indices(self.fit_info) def _create_fit_results(self, fit_model_all_params): """ Create the table of fitted parameter values and errors. This method delegates to the results assembler component. """ fit_param_errs = self._state['fit_param_errs'] valid_mask = self._state.get('valid_mask_by_id') return self._results_assembler.create_fit_results( fit_model_all_params, fit_param_errs, valid_mask, self.data_unit) def _assemble_fit_results(self): """ Assemble the final fitted results tables and parameters. This method creates the fitted models table and fit parameters table from the per-source data that was stored during the fitting process. It also computes fit error indices. Returns ------- fit_params : `~astropy.table.Table` Table containing the fitted parameters and their errors. """ fit_error_indices = self._get_fit_error_indices() fitted_models_table = self._build_fitted_models_table() fit_params = self._create_fit_results(fitted_models_table) # store results in state for other methods that need them self._state['fit_error_indices'] = fit_error_indices self._state['fitted_models_table'] = fitted_models_table self._state['fit_params'] = fit_params return fit_params def _fit_sources(self, data, init_params, *, error=None, mask=None): """ Fit PSF models to sources in the input data. Parameters ---------- data : 2D ndarray The input image data. init_params : `~astropy.table.Table` The table of initial parameters for each source. error : 2D ndarray or `None`, optional The 1-sigma uncertainties of the input data. mask : 2D ndarray or `None`, optional A boolean mask where `True` values are masked (ignored). Returns ------- fit_params : `~astropy.table.Table` The table of fitted parameters and fit quality metrics for each source. """ # add row index for stable mapping if '_row_index' not in init_params.colnames: init_params['_row_index'] = np.arange(len(init_params)) self._initialize_source_state_storage(len(init_params)) source_groups = init_params.group_by('group_id').groups self._fit_source_groups(source_groups, data, mask, error) # clean up temporary row index column if '_row_index' in init_params.colnames: init_params.remove_column('_row_index') return self._assemble_fit_results() def _calc_fit_metrics(self, results_tbl): """ Calculate fit quality metrics qfit, cfit, and reduced_chi2. This method delegates to the results assembler component. """ sum_abs_residuals = self._state['sum_abs_residuals'] cen_residuals = self._state['cen_residuals'] reduced_chi2 = self._state['reduced_chi2'] return self._results_assembler.calc_fit_metrics( results_tbl, sum_abs_residuals, cen_residuals, reduced_chi2) def _define_flags(self, results_tbl, shape, init_params): """ Define per-source bitwise flags summarizing fit conditions. This method delegates to the results assembler component. """ fit_error_indices = self._state.get('fit_error_indices') fitted_models_table = self._state.get('fitted_models_table') valid_mask = self._state.get('valid_mask_by_id') invalid_reasons = self._state.get('invalid_reasons') return self._results_assembler.define_flags( results_tbl, shape, fit_error_indices, self.fit_info, fitted_models_table, valid_mask, invalid_reasons, init_params) def _assemble_results_table(self, init_params, fit_params, data_shape): """ Assemble the final results table. This method delegates to the results assembler component. """ # prepare metadata attributes class_attrs = {'psf_model', 'finder', 'grouper', 'fitter', 'local_bkg_estimator'} metadata_attrs = {} for attr in self._attrs: value = getattr(self, attr) if attr in class_attrs and value is not None: metadata_attrs[attr] = repr(value) else: metadata_attrs[attr] = value return self._results_assembler.assemble_results_table( init_params, fit_params, data_shape, self._state, self._calc_fit_metrics, self._define_flags, self.__class__.__name__, metadata_attrs) @staticmethod def _coerce_nddata(data): """ Return normalized (data, mask, error) if ``data`` is NDData. This helper extracts ``data.data``, propagates units, and derives an error array from an attached ``StdDevUncertainty`` (or compatible uncertainty) if present. Parameters ---------- data : `~astropy.nddata.NDData` The input data. Returns ------- data_array : 2D `~numpy.ndarray` or `~astropy.units.Quantity` The 2D data array. mask : 2D bool `~numpy.ndarray` or `None` The boolean mask array. error : 2D `~numpy.ndarray` or `~astropy.units.Quantity` or `None` The 1-sigma error array. """ data_array = data.data if data.unit is not None: data_array = data_array << data.unit mask = data.mask unc = data.uncertainty error = None if unc is not None: err = unc.represent_as(StdDevUncertainty).quantity if getattr(err, 'unit', None) == u.dimensionless_unscaled: err = err.value elif data.unit is not None: err = err.to(data.unit) error = err return data_array, mask, error
[docs] @_create_call_docstring(iterative=False) def __call__(self, data, *, mask=None, error=None, init_params=None): # reset state from previous runs self._reset_results() try: # handle NDData input if isinstance(data, NDData): data, mask, error = self._coerce_nddata(data) # prepare all inputs for sources to be fit data, mask, error, init_params = self._prepare_fit_inputs( data, mask=mask, error=error, init_params=init_params, ) # handle the case where no sources were found if init_params is None: return None self.init_params = init_params # fit sources defined in init_params fit_params = self._fit_sources(data, init_params, error=error, mask=mask) # assemble the final results table # Note: _assemble_results_table handles _state cleanup self.results = self._assemble_results_table( init_params, fit_params, data.shape) except Exception: # ensure state cleanup even if an exception occurs self._reset_state() raise self._reset_state() return self.results
def _reset_state(self): """ Reset _state dictionary in case of exceptions. This ensures memory is freed even if the normal cleanup path is not reached due to an exception during processing. """ if hasattr(self, '_state') and self._state: self._state.clear() @property @deprecated('2.3.0', alternative='results') def fit_params(self): """ The table of fit parameters and their errors. This table is a subset of the ``results`` table, containing only the fit parameters and their errors. It can be used as the ``init_params`` for subsequent `PSFPhotometry` fits. """ if self.results is None: return None tbl = QTable() for col_name in self.results.colnames: if col_name == 'id' or '_fit' in col_name or '_err' in col_name: tbl[col_name] = self.results[col_name] return tbl @staticmethod def _results_to_init_params(results_tbl, *, remove_invalid=True, reset_ids=True): """ Convert PSF photometry results to initial parameters format. This method extracts fitted parameters (columns ending with '_fit') from the results table and renames them to initial parameter format (ending with '_init'). This output can be used as ``init_params`` for subsequent `PSFPhotometry` runs, allowing iterative refinement of source positions and fluxes. This is a static helper method to allow it to be called by `~photutils.psf.IterativePSFPhotometry`. Parameters ---------- results_tbl : `~astropy.table.QTable` or None The table of fit results from a previous `PSFPhotometry` run. If None, returns None. remove_invalid : bool, optional If `True`, rows containing non-finite fitted values are removed. Default is `True`. reset_ids : bool, optional If `True`, the 'id' column is reset to sequential numbering starting from 1. If `False`, the 'id' values are preserved from ``results_tbl``. This option is ignored if ``remove_invalid`` is `False`. Default is `True`. Returns ------- init_params_tbl : `~astropy.table.QTable` or None A table with columns renamed from '*_fit' to '*_init', suitable for use as ``init_params`` in a subsequent `PSFPhotometry` call. Returns None if ``results_tbl`` is None. Notes ----- Only the 'id' column and columns with '_fit' suffix are included in the output. All other columns from the results table (e.g., quality metrics, flags) are excluded. """ # PSF photometry not yet run if results_tbl is None: return None tbl = QTable() for col_name in results_tbl.colnames: if col_name == 'id' or '_fit' in col_name: init_name = col_name.replace('_fit', '_init') tbl[init_name] = results_tbl[col_name] if remove_invalid: # Remove rows with any non-finite fitted values keep = np.all([np.isfinite(tbl[col]) for col in tbl.colnames], axis=0) tbl = tbl[keep] if reset_ids: tbl['id'] = np.arange(1, len(tbl) + 1) return tbl @staticmethod def _results_to_model_params(results_tbl, param_mapper, *, remove_invalid=True, reset_ids=True): """ Convert PSF photometry results to PSF model parameters format. This method extracts fitted parameters (columns ending with '_fit') from the results table and renames them to match the PSF model's parameter names (e.g., 'x_fit' → 'x_0', 'flux_fit' → 'flux'). This output can be used to reconstruct fitted PSF models for visualization or further analysis. This is a static helper method to allow it to be called by `~photutils.psf.IterativePSFPhotometry`. Parameters ---------- results_tbl : `~astropy.table.QTable` or None The table of fit results from a previous `PSFPhotometry` run. If None, returns None. param_mapper : `_PSFParameterMapper` The helper class that manages the mapping between aliases (e.g., 'x', 'flux') and PSF model parameter names (e.g., 'x_0', 'flux'). remove_invalid : bool, optional If `True`, rows containing non-finite fitted values are removed. Default is `True`. reset_ids : bool, optional If `True`, the 'id' column is reset to sequential numbering starting from 1. If `False`, the 'id' values are preserved from ``results_tbl``. This option is ignored if ``remove_invalid`` is `False`. Default is `True`. Returns ------- model_params_tbl : `~astropy.table.QTable` or None A table with columns renamed to match the PSF model's parameter names, suitable for model reconstruction. Returns None if ``results_tbl`` is None. Notes ----- Only the 'id' column and columns with '_fit' suffix are included in the output. All other columns from the results table (e.g., initial parameters, quality metrics, flags) are excluded. """ # PSF photometry not yet run if results_tbl is None: return None tbl = QTable() for col_name in results_tbl.colnames: if col_name == 'id' or '_fit' in col_name: alias = col_name.replace('_fit', '') model_param_name = param_mapper.alias_to_model_param.get( alias, alias) tbl[model_param_name] = results_tbl[col_name] if remove_invalid: # Remove rows with any non-finite fitted values keep = np.all([np.isfinite(tbl[col]) for col in tbl.colnames], axis=0) tbl = tbl[keep] if reset_ids: tbl['id'] = np.arange(1, len(tbl) + 1) return tbl
[docs] def results_to_init_params(self, *, remove_invalid=True, reset_ids=True): """ Create a table of the fitted model parameters from the results. The table columns are named according to those expected for the initial parameters table. It can be used as the ``init_params`` for subsequent `PSFPhotometry` fits. Parameters ---------- remove_invalid : bool, optional If `True`, rows that contain non-finite fitted values are removed. reset_ids : bool, optional If `True`, the 'id' column will be reset to a sequential numbering starting from 1. If `False`, the 'id' column will remain unchanged from the results table. This option is ignored if ``remove_invalid`` is `False`. """ return self._results_to_init_params(self.results, remove_invalid=remove_invalid, reset_ids=reset_ids)
[docs] def results_to_model_params(self, *, remove_invalid=True, reset_ids=True): """ Create a table of the fitted model parameters from the results. The table columns are named according to the PSF model parameter names. It can also be used to reconstruct the fitted PSF models for visualization or further analysis. Parameters ---------- remove_invalid : bool, optional If `True`, rows that contain non-finite fitted values are removed. reset_ids : bool, optional If `True`, the 'id' column will be reset to a sequential numbering starting from 1. If `False`, the 'id' column will remain unchanged from the results table. This option is ignored if ``remove_invalid`` is `False`. """ return self._results_to_model_params(self.results, self._param_mapper, remove_invalid=remove_invalid, reset_ids=reset_ids)
[docs] @deprecated_positional_kwargs(since='3.0', until='4.0') def decode_flags(self, return_bit_values=False): """ Decode the PSF photometry flags from the results table. This is a convenience method that calls `~photutils.psf.decode_psf_flags` with the 'flags' column from the results table. Parameters ---------- return_bit_values : bool, optional If `True`, return the decoded bit flags (integers) instead of the flag descriptions (strings). Default is `False`. Returns ------- decoded : list of list of str or list of list of int List of lists where each inner list contains the active flag names (or bit values) for the corresponding source in the results table. If no flags are set for a source, an empty list is returned for that source. Raises ------ ValueError If no results are available. Please run the PSFPhotometry instance first. See Also -------- photutils.psf.decode_psf_flags Examples -------- Decode flags from PSF photometry results: >>> import numpy as np >>> from astropy.table import Table >>> from photutils.psf import CircularGaussianPRF, PSFPhotometry >>> yy, xx = np.mgrid[:21, :21] >>> psf_model = CircularGaussianPRF(flux=1, x_0=10, y_0=10, fwhm=2) >>> # Create a source with negative flux to trigger a flag >>> m1 = CircularGaussianPRF(flux=100, x_0=10, y_0=10, fwhm=2) >>> m2 = CircularGaussianPRF(flux=-50, x_0=5, y_0=5, fwhm=2) >>> data = m1(xx, yy) + m2(xx, yy) >>> init_params = Table({'x': [10, 5], 'y': [10, 5], ... 'flux': [100, 100]}) >>> photometry = PSFPhotometry(psf_model, (3, 3)) >>> results = photometry(data, init_params=init_params) >>> decoded_flags = photometry.decode_flags() >>> for i, flags in enumerate(decoded_flags): ... print(f'Source {i+1}: {flags}') # doctest: +SKIP Source 1: [] Source 2: ['negative_flux'] """ if self.results is None: msg = ('No results available. Please run the PSFPhotometry ' 'instance first.') raise ValueError(msg) return decode_psf_flags(self.results['flags'], return_bit_values=return_bit_values)
def _get_model_image_params(self): # Convert fitted parameters to model parameter names without # filtering, so the row indices align with self.results model_params = self.results_to_model_params(remove_invalid=False) # Filter out invalid sources (those with NaN fitted values) keep = np.all([np.isfinite(model_params[col]) for col in model_params.colnames], axis=0) model_params = model_params[keep] # Extract local_bkg for the same valid sources local_bkg = self.results['local_bkg'][keep] return model_params, local_bkg
[docs] @deprecated_renamed_argument('include_localbkg', 'include_local_bkg', '3.0', until='4.0') @_make_model_image_docstring def make_model_image(self, shape, *, psf_shape=None, include_local_bkg=False): if self.results is None: msg = ('No results available. Please run the PSFPhotometry ' 'instance first.') raise ValueError(msg) model_params, local_bkg = self._get_model_image_params() maker = _ModelImageMaker(self.psf_model, model_params, local_bkg=local_bkg, progress_bar=self.progress_bar) return maker.make_model_image(shape, psf_shape=psf_shape, include_local_bkg=include_local_bkg)
[docs] @deprecated_renamed_argument('include_localbkg', 'include_local_bkg', '3.0', until='4.0') @_make_residual_image_docstring def make_residual_image(self, data, *, psf_shape=None, include_local_bkg=False): if self.results is None: msg = ('No results available. Please run the PSFPhotometry ' 'instance first.') raise ValueError(msg) model_params, local_bkg = self._get_model_image_params() maker = _ModelImageMaker(self.psf_model, model_params, local_bkg=local_bkg, progress_bar=self.progress_bar) return maker.make_residual_image(data, psf_shape=psf_shape, include_local_bkg=include_local_bkg)