Source code for photutils.psf.photometry

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
This module provides classes to perform PSF-fitting photometry.
"""

import contextlib
import inspect
import warnings
from collections import defaultdict
from copy import deepcopy
from itertools import chain

import astropy.units as u
import numpy as np
from astropy.modeling.fitting import TRFLSQFitter
from astropy.nddata import NDData, NoOverlapError, StdDevUncertainty
from astropy.table import QTable, Table, hstack, join, vstack
from astropy.utils import lazyproperty
from astropy.utils.decorators import deprecated_attribute
from astropy.utils.exceptions import AstropyUserWarning

from photutils.aperture import CircularAperture
from photutils.background import LocalBackground
from photutils.datasets import make_model_image as _make_model_image
from photutils.psf.groupers import SourceGrouper
from photutils.psf.utils import _get_psf_model_params, _validate_psf_model
from photutils.utils._misc import _get_meta
from photutils.utils._parameters import as_pair
from photutils.utils._progress_bars import add_progress_bar
from photutils.utils._quantity_helpers import process_quantities
from photutils.utils.cutouts import _overlap_slices as overlap_slices
from photutils.utils.exceptions import NoDetectionsWarning

__all__ = ['IterativePSFPhotometry', 'ModelImageMixin', 'PSFPhotometry']


[docs] class ModelImageMixin: """ Mixin class to provide methods to calculate model images and residuals. """
[docs] def make_model_image(self, shape, psf_shape=None, include_localbkg=False): """ Create a 2D image from the fit PSF models and optional local background. Parameters ---------- shape : 2 tuple of int The shape of the output array. psf_shape : 2 tuple of int, optional The shape of the region around the center of the fit model to render in the output image. If ``psf_shape`` is a scalar integer, then a square shape of size ``psf_shape`` will be used. If `None`, then the bounding box of the model will be used. This keyword must be specified if the model does not have a ``bounding_box`` attribute. include_localbkg : bool, optional Whether to include the local background in the rendered output image. Note that the local background level is included around each source over the region defined by ``psf_shape``. Thus, regions where the ``psf_shape`` of sources overlap will have the local background added multiple times. Returns ------- array : 2D `~numpy.ndarray` The rendered image from the fit PSF models. This image will not have any units. """ if isinstance(self, PSFPhotometry): progress_bar = self.progress_bar psf_model = self.psf_model fit_params = self._fit_model_params local_bkgs = self.init_params['local_bkg'] else: psf_model = self._psfphot.psf_model progress_bar = self._psfphot.progress_bar if self.mode == 'new': # collect the fit params and local backgrounds from each # iteration local_bkgs = [] for i, psfphot in enumerate(self.fit_results): if i == 0: fit_params = psfphot._fit_model_params else: fit_params = vstack((fit_params, psfphot._fit_model_params)) local_bkgs.append(psfphot.init_params['local_bkg']) local_bkgs = _flatten(local_bkgs) else: # use the fit params and local backgrounds only from the # final iteration, which includes all sources fit_params = self.fit_results[-1]._fit_model_params local_bkgs = self.fit_results[-1].init_params['local_bkg'] model_params = fit_params if include_localbkg: # add local_bkg model_params = model_params.copy() model_params['local_bkg'] = local_bkgs try: x_name = psf_model.x_name y_name = psf_model.y_name except AttributeError: x_name = 'x_0' y_name = 'y_0' return _make_model_image(shape, psf_model, model_params, model_shape=psf_shape, x_name=x_name, y_name=y_name, progress_bar=progress_bar)
[docs] def make_residual_image(self, data, psf_shape=None, include_localbkg=False): """ Create a 2D residual image from the fit PSF models and local background. Parameters ---------- data : 2D `~numpy.ndarray` The 2D array on which photometry was performed. This should be the same array input when calling the PSF-photometry class. psf_shape : 2 tuple of int, optional The shape of the region around the center of the fit model to subtract. If ``psf_shape`` is a scalar integer, then a square shape of size ``psf_shape`` will be used. If `None`, then the bounding box of the model will be used. This keyword must be specified if the model does not have a ``bounding_box`` attribute. include_localbkg : bool, optional Whether to include the local background in the subtracted model. Note that the local background level is subtracted around each source over the region defined by ``psf_shape``. Thus, regions where the ``psf_shape`` of sources overlap will have the local background subtracted multiple times. Returns ------- array : 2D `~numpy.ndarray` The residual image of the ``data`` minus the fit PSF models minus the optional``local_bkg``. """ if isinstance(data, NDData): residual = deepcopy(data) data_arr = data.data if data.unit is not None: data_arr <<= data.unit residual.data[:] = self.make_residual_image( data_arr, psf_shape=psf_shape, include_localbkg=include_localbkg) else: residual = self.make_model_image(data.shape, psf_shape=psf_shape, include_localbkg=include_localbkg) np.subtract(data, residual, out=residual) return residual
[docs] class PSFPhotometry(ModelImageMixin): """ Class to perform PSF photometry. This class implements a flexible PSF photometry algorithm that can find sources in an image, group overlapping sources, fit the PSF model to the sources, and subtract the fit PSF models from the image. Parameters ---------- psf_model : 2D `astropy.modeling.Model` The PSF model to fit to the data. The model must have parameters named ``x_0``, ``y_0``, and ``flux``, corresponding to the center (x, y) position and flux, or it must have 'x_name', 'y_name', and 'flux_name' attributes that map to the x, y, and flux parameters (i.e., a model output from `make_psf_model`). The model must be two-dimensional such that it accepts 2 inputs (e.g., x and y) and provides 1 output. fit_shape : int or length-2 array_like The rectangular shape around the center of a star that will be used to define the PSF-fitting data. If ``fit_shape`` is a scalar then a square shape of size ``fit_shape`` will be used. If ``fit_shape`` has two elements, they must be in ``(ny, nx)`` order. Each element of ``fit_shape`` must be an odd number. In general, ``fit_shape`` should be set to a small size (e.g., ``(5, 5)``) that covers the region with the highest flux signal-to-noise. finder : callable or `~photutils.detection.StarFinderBase` or `None`, \ optional A callable used to identify stars in an image. The ``finder`` must accept a 2D image as input and return a `~astropy.table.Table` containing the x and y centroid positions. These positions are used as the starting points for the PSF fitting. The allowed ``x`` column names are (same suffix for ``y``): ``'x_init'``, ``'xinit'``, ``'x'``, ``'x_0'``, ``'x0'``, ``'xcentroid'``, ``'x_centroid'``, ``'x_peak'``, ``'xcen'``, ``'x_cen'``, ``'xpos'``, ``'x_pos'``, ``'x_fit'``, and ``'xfit'``. If `None`, then the initial (x, y) model positions must be input using the ``init_params`` keyword when calling the class. The (x, y) values in ``init_params`` override this keyword. If this class is run on an image that has units (i.e., a `~astropy.units.Quantity` array), then certain ``finder`` keywords (e.g., ``threshold``) must have the same units. Please see the documentation for the specific ``finder`` class for more information. grouper : `~photutils.psf.SourceGrouper` or callable or `None`, optional A callable used to group stars. Typically, grouped stars are those that overlap with their neighbors. Stars that are grouped are fit simultaneously. The ``grouper`` must accept the x and y coordinates of the sources and return an integer array of the group id numbers (starting from 1) indicating the group in which a given source belongs. If `None`, then no grouping is performed, i.e. each source is fit independently. The ``group_id`` values in ``init_params`` override this keyword. A warning is raised if any group size is larger than 25 sources. fitter : `~astropy.modeling.fitting.Fitter`, optional The fitter object used to perform the fit of the model to the data. fitter_maxiters : int, optional The maximum number of iterations in which the ``fitter`` is called for each source. This keyword can be increased if the fit is not converging for sources (e.g., the output ``flags`` value contains 8). xy_bounds : `None`, float, or 2-tuple of float, optional The maximum distance in pixels that a fitted source can be from the initial (x, y) position. If a single float, then the same maximum distance is used for both x and y. If a 2-tuple of floats, then the distances are in ``(x, y)`` order. If `None`, then no bounds are applied. Either value can also be `None` to indicate no bound in that direction. localbkg_estimator : `~photutils.background.LocalBackground` or `None`, \ optional The object used to estimate the local background around each source. If `None`, then no local background is subtracted. The ``local_bkg`` values in ``init_params`` override this keyword. This option should be used with care, especially in crowded fields where the ``fit_shape`` of sources overlap (see Notes below). aperture_radius : float, optional The radius of the circular aperture used to estimate the initial flux of each source. If initial flux values are present in the ``init_params`` table, they will override this keyword. progress_bar : bool, optional Whether to display a progress bar when fitting the sources (or groups). The progress bar requires that the `tqdm <https://tqdm.github.io/>`_ optional dependency be installed. Note that the progress bar does not currently work in the Jupyter console due to limitations in ``tqdm``. Notes ----- The data that will be fit for each source is defined by the ``fit_shape`` parameter. A cutout will be made around the initial center of each source with a shape defined by ``fit_shape``. The PSF model will be fit to the data in this region. The cutout region that is fit does not shift if the source center shifts during the fit iterations. Therefore, the initial source positions should be close to the true source positions. One way to ensure this is to use a ``finder`` to identify sources in the data. If the fitted positions are significantly different from the initial positions, one can rerun the `PSFPhotometry` class using the fit results as the input ``init_params``, which will change the fitted cutout region for each source. After calling `PSFPhotometry` on the data, it will have a ``fit_params`` attribute containing the fitted model parameters. This table can be used as the ``init_params`` input in a subsequent call to `PSFPhotometry`. If the returned model parameter errors are NaN, then either the fit did not converge, the model parameter was fixed, or the input ``fitter`` did not return parameter errors. For the later case, one can try a different fitter that may return parameter errors (e.g., `astropy.modeling.fitting.DogBoxLSQFitter` or `astropy.modeling.fitting.LMLSQFitter`). The local background value around each source is optionally estimated using the ``localbkg_estimator`` or obtained from the ``local_bkg`` column in the input ``init_params`` table. This local background is then subtracted from the data over the ``fit_shape`` region for each source before fitting the PSF model. For sources where their ``fit_shape`` regions overlap, the local background will effectively be subtracted twice in the overlapping ``fit_shape`` regions, even if the source ``grouper`` is input. This is not an issue if the sources are well-separated. However, for crowded fields, please use the ``localbkg_estimator`` (or ``local_bkg`` column in ``init_params``) with care. Care should be taken in defining the star groups. Simultaneously fitting very large star groups is computationally expensive and error-prone. Internally, source grouping requires the creation of a compound Astropy model. Due to the way compound Astropy models are currently constructed, large groups also require excessively large amounts of memory; this will hopefully be fixed in a future Astropy version. A warning will be raised if the number of sources in a group exceeds 25. """ fit_results = deprecated_attribute('fit_results', '2.0.0', alternative='fit_info') def __init__(self, psf_model, fit_shape, *, finder=None, grouper=None, fitter=TRFLSQFitter(), fitter_maxiters=100, xy_bounds=None, localbkg_estimator=None, aperture_radius=None, progress_bar=False): self._param_maps = self._define_param_maps(psf_model) self.psf_model = _validate_psf_model(psf_model) self.fit_shape = as_pair('fit_shape', fit_shape, lower_bound=(1, 0), check_odd=True) self.grouper = self._validate_grouper(grouper) self.finder = self._validate_callable(finder, 'finder') self.fitter = self._validate_callable(fitter, 'fitter') self.localbkg_estimator = self._validate_localbkg( localbkg_estimator, 'localbkg_estimator') self.fitter_maxiters = self._validate_maxiters(fitter_maxiters) self.xy_bounds = self._validate_bounds(xy_bounds) self.aperture_radius = self._validate_radius(aperture_radius) self.progress_bar = progress_bar # be sure to reset these attributes for each __call__ # (see _reset_results) self.data_unit = None self.finder_results = None self.init_params = None self.fit_params = None self._fit_model_params = None self.results = None self.fit_info = defaultdict(list) self._group_results = defaultdict(list) def _reset_results(self): """ Reset these attributes for each __call__. """ self.data_unit = None self.finder_results = None self.init_params = None self.fit_params = None self._fit_model_params = None self.results = None self.fit_info = defaultdict(list) self._group_results = defaultdict(list) def _validate_grouper(self, grouper): if grouper is not None and not isinstance(grouper, SourceGrouper): raise ValueError('grouper must be a SourceGrouper instance.') return grouper @staticmethod def _define_model_params_map(psf_model): # The main parameter names are checked together as a unit in the # following order: # * ('x_0', 'y_0', 'flux') parameters # * ('x_name', 'y_name', 'flux_name') attributes main_params = _get_psf_model_params(psf_model) main_aliases = ('x', 'y', 'flux') params_map = dict(zip(main_aliases, main_params, strict=True)) # define the fitted model parameters fitted_params = [] for key, val in psf_model.fixed.items(): if not val: fitted_params.append(key) # define the "extra" fitted model parameters that do not # correspond to x, y, or flux extra_params = [key for key in fitted_params if key not in main_params] other_params = {key: key for key in extra_params} params_map.update(other_params) return params_map def _define_param_maps(self, psf_model): """ Map x, y, and flux column names to the PSF model parameter names. Also include any extra PSF model parameters that are fit, but do not correspond to x, y, or flux. The column names include the ``_init``, ``_fit``, and ``_err`` suffixes for each parameter. """ params_map = self._define_model_params_map(psf_model) param_maps = {} param_maps['model'] = params_map # Keep track of only the fitted parameters in the same order as # they are stored in the psf_model. This is used to extract the # fitted parameter errors from the fitter output. fit_params = {} inv_pmap = {val: key for key, val in params_map.items()} for name in psf_model.param_names: if not psf_model.fixed[name]: fit_params[inv_pmap[name]] = name param_maps['fit_params'] = fit_params suffixes = ('init', 'fit', 'err') for suffix in suffixes: pmap = {} for key, val in params_map.items(): pmap[val] = f'{key}_{suffix}' param_maps[suffix] = pmap init_cols = {} for key in param_maps['model']: init_cols[key] = f'{key}_init' param_maps['init_cols'] = init_cols return param_maps @staticmethod def _validate_callable(obj, name): if obj is not None and not callable(obj): raise TypeError(f'{name!r} must be a callable object') return obj def _validate_localbkg(self, value, name): if value is not None and not isinstance(value, LocalBackground): raise ValueError('localbkg_estimator must be a ' 'LocalBackground instance.') return self._validate_callable(value, name) def _validate_maxiters(self, maxiters): spec = inspect.signature(self.fitter.__call__) if 'maxiter' not in spec.parameters: warnings.warn('"maxiters" will be ignored because it is not ' 'accepted by the input fitter __call__ method', AstropyUserWarning) maxiters = None return maxiters def _validate_bounds(self, xy_bounds): if xy_bounds is None: return xy_bounds xy_bounds = np.atleast_1d(xy_bounds) if len(xy_bounds) == 1: xy_bounds = np.array((xy_bounds[0], xy_bounds[0])) if len(xy_bounds) != 2: raise ValueError('xy_bounds must have 1 or 2 elements') if xy_bounds.ndim != 1: raise ValueError('xy_bounds must be a 1D array') non_none = [i for i in xy_bounds if i is not None] if np.any(np.array(non_none) <= 0): raise ValueError('xy_bounds must be strictly positive') return xy_bounds @staticmethod def _validate_radius(radius): if radius is not None and (not np.isscalar(radius) or radius <= 0 or ~np.isfinite(radius)): raise ValueError('aperture_radius must be a strictly-positive ' 'scalar') return radius def _validate_array(self, array, name, data_shape=None): if name == 'mask' and array is np.ma.nomask: array = None if array is not None: array = np.asanyarray(array) if array.ndim != 2: raise ValueError(f'{name} must be a 2D array.') if data_shape is not None and array.shape != data_shape: raise ValueError(f'data and {name} must have the same shape.') return array @lazyproperty def _valid_colnames(self): """ A dictionary of valid column names for the input ``init_params`` table. These lists are searched in order. """ xy_suffixes = ('_init', 'init', '', '_0', '0', 'centroid', '_centroid', '_peak', 'cen', '_cen', 'pos', '_pos', '_fit', 'fit') x_valid = ['x' + i for i in xy_suffixes] y_valid = ['y' + i for i in xy_suffixes] valid_colnames = {} valid_colnames['x'] = x_valid valid_colnames['y'] = y_valid valid_colnames['flux'] = ('flux_init', 'fluxinit', 'flux', 'flux_0', 'flux0', 'flux_fit', 'fluxfit', 'source_sum', 'segment_flux', 'kron_flux') return valid_colnames def _find_column_name(self, key, colnames): """ Find the first valid matching column name for x, y, or flux (defined by `_valid_colnames` in the input ``init_params`` table). """ name = '' try: valid_names = self._valid_colnames[key] except KeyError: # parameters other than (x, y, flux) must have "_init", "", # or "_fit" suffixes valid_names = [f'{key}_init', key, f'{key}_fit'] for valid_name in valid_names: if valid_name in colnames: name = valid_name break # return the first match return name def _check_init_units(self, init_params, colname): values = init_params[colname] if isinstance(values, u.Quantity): if self.data_unit is None: raise ValueError(f'init_params {colname} column has ' 'units, but the input data does not ' 'have units.') try: init_params[colname] = values.to(self.data_unit) except u.UnitConversionError as exc: raise ValueError(f'init_params {colname} column has ' 'units that are incompatible with ' 'the input data units.') from exc elif self.data_unit is not None: raise ValueError('The input data has units, but the ' f'init_params {colname} column does not ' 'have units.') return init_params @staticmethod def _rename_init_columns(init_params, param_maps, find_column_name): """ Rename the columns in the input ``init_params`` table to the expected names with the "_init" suffix if necessary. This is a static method to allow the method to be called from `IterativePSFPhotometry`. """ for param in param_maps['model']: colname = find_column_name(param, init_params.colnames) if colname: init_name = param_maps['init_cols'][param] if colname != init_name: init_params.rename_column(colname, init_name) return init_params def _validate_init_params(self, init_params): """ Validate the input ``init_params`` table. Also rename the columns to the expected names with the "_init" suffix if necessary. """ if init_params is None: return init_params if not isinstance(init_params, Table): raise TypeError('init_params must be an astropy Table') # copy is used to preserve the input init_params init_params = self._rename_init_columns(init_params.copy(), self._param_maps, self._find_column_name) # x and y columns are always required xcolname = self._param_maps['init_cols']['x'] ycolname = self._param_maps['init_cols']['y'] if (xcolname not in init_params.colnames or ycolname not in init_params.colnames): raise ValueError('init_param must contain valid column names ' 'for the x and y source positions') fluxcolname = self._param_maps['init_cols']['flux'] if fluxcolname in init_params.colnames: init_params = self._check_init_units(init_params, fluxcolname) if 'local_bkg' in init_params.colnames: if not np.all(np.isfinite(init_params['local_bkg'])): raise ValueError('init_params local_bkg column contains ' 'non-finite values.') init_params = self._check_init_units(init_params, 'local_bkg') return init_params @staticmethod def _make_mask(image, mask): def warn_nonfinite(): warnings.warn('Input data contains unmasked non-finite values ' '(NaN or inf), which were automatically ignored.', AstropyUserWarning) # if NaNs are in the data, no actual fitting takes place # https://github.com/astropy/astropy/pull/12811 finite_mask = ~np.isfinite(image) if mask is not None: finite_mask |= mask if np.any(finite_mask & ~mask): warn_nonfinite() else: mask = finite_mask if np.any(finite_mask): warn_nonfinite() else: mask = None return mask def _get_aper_fluxes(self, data, mask, init_params): xpos = init_params[self._param_maps['init_cols']['x']] ypos = init_params[self._param_maps['init_cols']['y']] apertures = CircularAperture(zip(xpos, ypos, strict=True), r=self.aperture_radius) flux, _ = apertures.do_photometry(data, mask=mask) return flux def _prepare_init_params(self, data, mask, init_params): xcolname = self._param_maps['init_cols']['x'] ycolname = self._param_maps['init_cols']['y'] fluxcolname = self._param_maps['init_cols']['flux'] if init_params is None: if self.finder is None: raise ValueError('finder must be defined if init_params ' 'is not input') # restore units to the input data (stripped earlier) if self.data_unit is not None: sources = self.finder(data << self.data_unit, mask=mask) else: sources = self.finder(data, mask=mask) if sources is None: return None self.finder_results = sources init_params = QTable() init_params['id'] = np.arange(len(sources)) + 1 init_params[xcolname] = sources['xcentroid'] init_params[ycolname] = sources['ycentroid'] else: colnames = init_params.colnames if 'id' not in colnames: init_params['id'] = np.arange(len(init_params)) + 1 if 'local_bkg' not in init_params.colnames: if self.localbkg_estimator is None: local_bkg = np.zeros(len(init_params)) else: local_bkg = self.localbkg_estimator( data, init_params[xcolname], init_params[ycolname], mask=mask) if self.data_unit is not None: local_bkg <<= self.data_unit init_params['local_bkg'] = local_bkg if fluxcolname not in init_params.colnames: flux = self._get_aper_fluxes(data, mask, init_params) if self.data_unit is not None: flux <<= self.data_unit flux -= init_params['local_bkg'] init_params[fluxcolname] = flux if 'group_id' in init_params.colnames: # grouper is ignored if group_id is input in init_params self.grouper = None if self.grouper is not None: group_id = self.grouper(init_params[xcolname], init_params[ycolname]) else: group_id = init_params['id'].copy() init_params['group_id'] = group_id # add columns for any additional parameters that are fit for param_name, colname in self._param_maps['init'].items(): if colname not in init_params.colnames: init_params[colname] = getattr(self.psf_model, param_name) extra_param_cols = [] for colname in self._param_maps['init_cols'].values(): if colname in (xcolname, ycolname, fluxcolname): continue extra_param_cols.append(colname) # order init_params columns colname_order = ['id', 'group_id', 'local_bkg', xcolname, ycolname, fluxcolname] colname_order.extend(extra_param_cols) return init_params[colname_order] def _get_invalid_positions(self, init_params, shape): """ Get a mask of sources with no overlap with the data. This code is based on astropy.nddata.overlap_slices. """ x = init_params[self._param_maps['init_cols']['x']] y = init_params[self._param_maps['init_cols']['y']] positions = np.column_stack((y, x)) delta = self.fit_shape / 2 min_idx = np.ceil(positions - delta) max_idx = np.ceil(positions + delta) return np.any(max_idx <= 0, axis=1) | np.any(min_idx >= shape, axis=1) def _check_init_positions(self, init_params, shape): """ Check the initial source positions to ensure they are within the data shape. """ if np.any(self._get_invalid_positions(init_params, shape)): raise ValueError('Some of the sources have no overlap with the ' 'data. Check the initial source positions or ' 'increase the fit_shape.') def _make_psf_model(self, sources): """ Make a PSF model to fit a single source or several sources within a group. """ init_param_map = self._param_maps['init'] for index, source in enumerate(sources): model = self.psf_model.copy() for model_param, init_col in init_param_map.items(): value = source[init_col] if isinstance(value, u.Quantity): value = value.value # psf model cannot be fit with units setattr(model, model_param, value) model.name = source['id'] if self.xy_bounds is not None: if self.xy_bounds[0] is not None: x_param = getattr(model, self._param_maps['model']['x']) x_param.bounds = (x_param.value - self.xy_bounds[0], x_param.value + self.xy_bounds[0]) if self.xy_bounds[1] is not None: y_param = getattr(model, self._param_maps['model']['y']) y_param.bounds = (y_param.value - self.xy_bounds[1], y_param.value + self.xy_bounds[1]) if index == 0: psf_model = model else: psf_model += model return psf_model @staticmethod def _move_column(table, colname, colname_after): """ Move a column to a new position in a table. The table is modified in place. Parameters ---------- table : `~astropy.table.Table` The input table. colname : str The column name to move. colname_after : str The column name after which to place the moved column. Returns ------- table : `~astropy.table.Table` The input table with the column moved. """ colnames = table.colnames if colname not in colnames or colname_after not in colnames: return table if colname == colname_after: return table old_index = colnames.index(colname) new_index = colnames.index(colname_after) if old_index > new_index: new_index += 1 colnames.insert(new_index, colnames.pop(old_index)) return table[colnames] def _model_params_to_table(self, models): """ Convert a list of PSF models to a table of model parameters. The inputs ``models`` are assumed to be instances of the same model class (i.e., the parameters names are the same for all models). """ param_names = list(models[0].param_names) params = defaultdict(list) for model in models: for name in param_names: param = getattr(model, name) value = param.value if (self.data_unit is not None and name == self._param_maps['model']['flux']): value <<= self.data_unit # add the flux units params[name].append(value) params[f'{name}_fixed'].append(param.fixed) params[f'{name}_bounds'].append(param.bounds) table = QTable(params) ids = np.arange(len(table)) + 1 table.add_column(ids, index=0, name='id') return table def _param_errors_to_table(self): param_err = self.fit_info.pop('fit_param_errs') err_param_map = self._param_maps['err'] table = QTable() for index, name in enumerate(self._param_maps['fit_params'].values()): colname = err_param_map[name] value = param_err[:, index] if (self.data_unit is not None and name == self._param_maps['model']['flux']): value <<= self.data_unit # add the flux units table[colname] = value colnames = list(err_param_map.values()) # add error columns for fixed params; errors are set to NaN nsources = len(self.init_params) for colname in colnames: if colname not in table.colnames: table[colname] = [np.nan] * nsources # sort column names return table[colnames] def _prepare_fit_results(self, fit_params): """ Prepare the output table of fit results. """ # remove parameters that are not fit out_params = fit_params.copy() for column in out_params.colnames: if column == 'id': continue if column not in self._param_maps['fit']: out_params.remove_column(column) # rename columns to have the "fit" suffix for key, val in self._param_maps['fit'].items(): out_params.rename_column(key, val) # reorder columns to have "flux" come immediately after "y" ymodelparam = self._param_maps['model']['y'] fluxmodelparam = self._param_maps['model']['flux'] y_col = self._param_maps['fit'][ymodelparam] flux_col = self._param_maps['fit'][fluxmodelparam] out_params = self._move_column(out_params, flux_col, y_col) # add parameter error columns param_errs = self._param_errors_to_table() return hstack([out_params, param_errs]) def _define_fit_data(self, sources, data, mask): yi = [] xi = [] cutout = [] npixfit = [] cen_index = [] for row in sources: xcen = row[self._param_maps['init_cols']['x']] ycen = row[self._param_maps['init_cols']['y']] try: slc_lg, _ = overlap_slices(data.shape, self.fit_shape, (ycen, xcen), mode='trim') except NoOverlapError as exc: # pragma: no cover # this should never happen because the initial positions # are checked in _prepare_fit_inputs msg = (f'Initial source at ({xcen}, {ycen}) does not ' 'overlap with the input data.') raise ValueError(msg) from exc yy, xx = np.mgrid[slc_lg] if mask is not None: inv_mask = ~mask[yy, xx] if np.count_nonzero(inv_mask) == 0: msg = (f'Source at ({xcen}, {ycen}) is completely masked. ' 'Remove the source from init_params or correct ' 'the input mask.') raise ValueError(msg) yy = yy[inv_mask] xx = xx[inv_mask] else: xx = xx.ravel() yy = yy.ravel() xi.append(xx) yi.append(yy) local_bkg = row['local_bkg'] if isinstance(local_bkg, u.Quantity): local_bkg = local_bkg.value cutout.append(data[yy, xx] - local_bkg) npixfit.append(len(xx)) # this is overlap_slices center pixel index (before any trimming) xcen = np.ceil(xcen - 0.5).astype(int) ycen = np.ceil(ycen - 0.5).astype(int) idx = np.where((xx == xcen) & (yy == ycen))[0] if len(idx) == 0: idx = [np.nan] cen_index.append(idx[0]) # flatten the lists, which may contain arrays of different lengths # due to masking xi = _flatten(xi) yi = _flatten(yi) cutout = _flatten(cutout) self._group_results['npixfit'].append(npixfit) self._group_results['psfcenter_indices'].append(cen_index) return yi, xi, cutout @staticmethod def _split_compound_model(model, chunk_size): for i in range(0, model.n_submodels, chunk_size): yield model[i:i + chunk_size] @staticmethod def _split_param_errs(param_err, nparam): for i in range(0, len(param_err), nparam): yield param_err[i:i + nparam] def _order_by_id(self, iterable): """ Reorder the list from group-id to source-id order. """ return [iterable[i] for i in self._group_results['ungroup_indices']] def _ungroup(self, iterable): """ Expand a list of lists (groups) and reorder in source-id order. """ iterable = _flatten(iterable) return self._order_by_id(iterable) def _get_fit_error_indices(self): indices = [] for index, fit_info in enumerate(self.fit_info['fit_infos']): ierr = fit_info.get('ierr', None) # check if in good flags defined by scipy if ierr is not None: # scipy.optimize.leastsq if ierr not in (1, 2, 3, 4): indices.append(index) else: # scipy.optimize.least_squares status = fit_info.get('status', None) if status is not None and status in (-1, 0): indices.append(index) return np.array(indices, dtype=int) def _parse_fit_results(self, group_models, group_fit_infos): """ Parse the fit results for each source or group of sources. """ psf_nsub = self.psf_model.n_submodels fit_models = [] fit_infos = [] fit_param_errs = [] nfitparam = len(self._param_maps['fit_params'].keys()) for model, fit_info in zip(group_models, group_fit_infos, strict=True): model_nsub = model.n_submodels npsf_models = model_nsub // psf_nsub # NOTE: param_cov/param_err are returned in the same order # as the model parameters param_cov = fit_info.get('param_cov', None) if param_cov is None: if nfitparam == 0: # model params are all fixed nfitparam = 3 # x_err, y_err, and flux_err are np.nan param_err = np.array([np.nan] * nfitparam * npsf_models) else: param_err = np.sqrt(np.diag(param_cov)) # model is for a single source (which may be compound) if npsf_models == 1: fit_models.append(model) fit_infos.append(fit_info) fit_param_errs.append(param_err) continue # model is a grouped model for multiple sources fit_models.extend(self._split_compound_model(model, psf_nsub)) fit_infos.extend([fit_info] * npsf_models) # views fit_param_errs.extend(self._split_param_errs(param_err, nfitparam)) if len(fit_models) != len(fit_infos): # pragma: no cover raise ValueError('fit_models and fit_infos have different lengths') # change the sorting from group_id to source id order fit_models = self._order_by_id(fit_models) fit_infos = self._order_by_id(fit_infos) fit_param_errs = np.array(self._order_by_id(fit_param_errs)) self.fit_info['fit_infos'] = fit_infos self.fit_info['fit_error_indices'] = self._get_fit_error_indices() self.fit_info['fit_param_errs'] = fit_param_errs return fit_models def _fit_sources(self, data, init_params, *, error=None, mask=None): if self.fitter_maxiters is not None: kwargs = {'maxiter': self.fitter_maxiters} else: kwargs = {} sources = init_params.group_by('group_id') ungroup_idx = np.argsort(sources['id'].value) self._group_results['ungroup_indices'] = ungroup_idx sources = sources.groups if self.progress_bar: # pragma: no cover desc = 'Fit source/group' sources = add_progress_bar(sources, desc=desc) # Save the fit_info results for these keys if they are present. # Some of these keys are returned only by some fitters. These # keys contain the fit residuals (fvec or fun), the parameter # covariance matrix (param_cov), and the fit status (ierr, # message) or (status). fit_info_keys = ('fvec', 'fun', 'param_cov', 'ierr', 'message', 'status') fit_models = [] fit_infos = [] nmodels = [] for sources_ in sources: # fit in group_id order nsources = len(sources_) nmodels.append([nsources] * nsources) psf_model = self._make_psf_model(sources_) yi, xi, cutout = self._define_fit_data(sources_, data, mask) weights = 1.0 / error[yi, xi] if error is not None else None with warnings.catch_warnings(): warnings.simplefilter('ignore', AstropyUserWarning) fit_model = self.fitter(psf_model, xi, yi, cutout, weights=weights, **kwargs) with contextlib.suppress(AttributeError): fit_model.clear_cache() fit_info = {} for key in fit_info_keys: value = self.fitter.fit_info.get(key, None) if value is not None: fit_info[key] = value fit_models.append(fit_model) fit_infos.append(fit_info) self._group_results['fit_infos'] = fit_infos self._group_results['nmodels'] = nmodels # split the groups and return objects in source-id order fit_models = self._parse_fit_results(fit_models, fit_infos) _fit_model_params = self._model_params_to_table(fit_models) fit_params = self._prepare_fit_results(_fit_model_params) self._fit_model_params = _fit_model_params # ungrouped self.fit_params = fit_params return fit_params def _calc_fit_metrics(self, results_tbl): # Keep cen_idx as a list because it can have NaNs with the ints. # If NaNs are present, turning it into an array will convert the # ints to floats, which cannot be used as slices. cen_idx = self._ungroup(self._group_results['psfcenter_indices']) split_index = [np.cumsum(npixfit)[:-1] for npixfit in self._group_results['npixfit']] # find the key with the fit residual (fitter dependent) finfo_keys = self._group_results['fit_infos'][0].keys() keys = ('fvec', 'fun') key = None for key_ in keys: if key_ in finfo_keys: key = key_ # SimplexLSQFitter if key is None: qfit = cfit = np.array([[np.nan]] * len(results_tbl)) return qfit, cfit fit_residuals = [] for idx, fit_info in zip(split_index, self._group_results['fit_infos'], strict=True): fit_residuals.extend(np.split(fit_info[key], idx)) fit_residuals = self._order_by_id(fit_residuals) with warnings.catch_warnings(): # ignore divide-by-zero if flux = 0 warnings.simplefilter('ignore', RuntimeWarning) flux_name = self._param_maps['model']['flux'] fluxcolname = self._param_maps['fit'][flux_name] qfit = [] cfit = [] for index, (residual, cen_idx_) in enumerate( zip(fit_residuals, cen_idx, strict=True)): flux_fit = results_tbl[fluxcolname][index] if isinstance(flux_fit, u.Quantity): flux_fit = flux_fit.value qfit.append(np.sum(np.abs(residual)) / flux_fit) if np.isnan(cen_idx_): # masked central pixel cen_residual = np.nan else: # find residual at center pixel; # astropy fitters compute residuals as # (model - data), thus need to negate the residual cen_residual = -residual[cen_idx_] cfit.append(cen_residual / flux_fit) return qfit, cfit def _define_flags(self, results_tbl, shape): flags = np.zeros(len(results_tbl), dtype=int) model_names = self._param_maps['model'] param_map = self._param_maps['fit'] xcolname = param_map[model_names['x']] ycolname = param_map[model_names['y']] fluxcolname = param_map[model_names['flux']] for index, row in enumerate(results_tbl): if row['npixfit'] < np.prod(self.fit_shape): flags[index] += 1 if (row[xcolname] < 0 or row[ycolname] < 0 or row[xcolname] > shape[1] or row[ycolname] > shape[0]): flags[index] += 2 if row[fluxcolname] <= 0: flags[index] += 4 flags[self.fit_info['fit_error_indices']] += 8 try: for index, fit_info in enumerate(self.fit_info['fit_infos']): if fit_info['param_cov'] is None: flags[index] += 16 except KeyError: pass # add flag = 32 if x or y fitted value is at the bounds if self.xy_bounds is not None: xcolname = self._param_maps['model']['x'] ycolname = self._param_maps['model']['y'] for index, row in enumerate(self._fit_model_params): x_bounds = row[f'{xcolname}_bounds'] x_bounds = np.array([i for i in x_bounds if i is not None]) y_bounds = row[f'{ycolname}_bounds'] y_bounds = np.array([i for i in y_bounds if i is not None]) dx = x_bounds - row[xcolname] dy = y_bounds - row[ycolname] if np.any(dx == 0) or np.any(dy == 0): flags[index] += 32 return flags def _prepare_fit_inputs(self, data, *, mask=None, error=None, init_params=None): """ Prepare inputs for PSF fitting. Tasks: * Checks array input shapes and units. * Calculates a total mask * Validates inputs for init_params and aperture_radius * Prepares initial parameters table - Runs source finder if needed - Runs aperture photometry if needed - Runs local background estimation if needed - Groups sources if needed """ (data, error), unit = process_quantities((data, error), ('data', 'error')) self.data_unit = unit data = self._validate_array(data, 'data') error = self._validate_array(error, 'error', data_shape=data.shape) mask = self._validate_array(mask, 'mask', data_shape=data.shape) mask = self._make_mask(data, mask) init_params = self._validate_init_params(init_params) # copies fluxcol = self._param_maps['init_cols']['flux'] if (self.aperture_radius is None and (init_params is None or fluxcol not in init_params.colnames)): raise ValueError('aperture_radius must be defined if init_params ' 'is not input or if a flux column is not in ' 'init_params') init_params = self._prepare_init_params(data, mask, init_params) if init_params is None: # no sources detected by finder return None self._check_init_positions(init_params, data.shape) self.init_params = init_params _, counts = np.unique(init_params['group_id'], return_counts=True) if max(counts) > 25: warnings.warn('Some groups have more than 25 sources. Fitting ' 'such groups may take a long time and be ' 'error-prone. You may want to consider using ' 'different `SourceGrouper` parameters or ' 'changing the "group_id" column in "init_params".', AstropyUserWarning) return data, mask, error, init_params
[docs] def __call__(self, data, *, mask=None, error=None, init_params=None): """ Perform PSF photometry. Parameters ---------- data : 2D `~numpy.ndarray` The 2D array on which to perform photometry. Invalid data values (i.e., NaN or inf) are automatically masked. mask : 2D bool `~numpy.ndarray`, optional A boolean mask with the same shape as ``data``, where a `True` value indicates the corresponding element of ``data`` is masked. error : 2D `~numpy.ndarray`, optional The pixel-wise 1-sigma errors of the input ``data``. ``error`` is assumed to include *all* sources of error, including the Poisson error of the sources (see `~photutils.utils.calc_total_error`) . ``error`` must have the same shape as the input ``data``. If ``data`` is a `~astropy.units.Quantity` array, then ``error`` must also be a `~astropy.units.Quantity` array with the same units. init_params : `~astropy.table.Table` or `None`, optional A table containing the initial guesses of the model parameters (e.g., x, y, flux) for each source. If the x and y values are not input, then the ``finder`` keyword must be defined. If the flux values are not input, then the initial fluxes will be measured using the ``aperture_radius`` keyword, which must be defined. Note that the initial flux values refer to the model flux parameters and are not corrected for local background values (computed using ``localbkg_estimator`` or input in a ``local_bkg`` column) The allowed column names are: * ``x_init``, ``xinit``, ``x``, ``x_0``, ``x0``, ``xcentroid``, ``x_centroid``, ``x_peak``, ``xcen``, ``x_cen``, ``xpos``, ``x_pos``, ``x_fit``, and ``xfit``. * ``y_init``, ``yinit``, ``y``, ``y_0``, ``y0``, ``ycentroid``, ``y_centroid``, ``y_peak``, ``ycen``, ``y_cen``, ``ypos``, ``y_pos``, ``y_fit``, and ``yfit``. * ``flux_init``, ``fluxinit``, ``flux``, ``flux_0``, ``flux0``, ``flux_fit``, ``fluxfit``, ``source_sum``, ``segment_flux``, and ``kron_flux``. * If the PSF model has additional free parameters that are fit, they can be included in the table. The column names must match the parameter names in the PSF model. They can also be suffixed with either the "_init" or "_fit" suffix. The suffix search order is "_init", "" (no suffix), and "_fit". For example, if the PSF model has an additional parameter named "sigma", then the allowed column names are: "sigma_init", "sigma", and "sigma_fit". If the column name is not found in the table, then the default value from the PSF model will be used. The parameter names are searched in the input table in the above order, stopping at the first match. If ``data`` is a `~astropy.units.Quantity` array, then the initial flux values in this table must also must also have compatible units. The table can also have ``group_id`` and ``local_bkg`` columns. If ``group_id`` is input, the values will be used and ``grouper`` keyword will be ignored. If ``local_bkg`` is input, those values will be used and the ``localbkg_estimator`` will be ignored. If ``data`` has units, then the ``local_bkg`` values must have the same units. Returns ------- table : `~astropy.table.QTable` An astropy table with the PSF-fitting results. The table will contain the following columns: * ``id`` : unique identification number for the source * ``group_id`` : unique identification number for the source group * ``group_size`` : the total number of sources that were simultaneously fit along with the given source * ``x_init``, ``x_fit``, ``x_err`` : the initial, fit, and error of the source x center * ``y_init``, ``y_fit``, ``y_err`` : the initial, fit, and error of the source y center * ``flux_init``, ``flux_fit``, ``flux_err`` : the initial, fit, and error of the source flux * ``npixfit`` : the number of unmasked pixels used to fit the source * ``qfit`` : a quality-of-fit metric defined as the absolute value of the sum of the fit residuals divided by the fit flux * ``cfit`` : a quality-of-fit metric defined as the fit residual in the initial central pixel value divided by the fit flux. NaN values indicate that the central pixel was masked. * ``flags`` : bitwise flag values - 1 : one or more pixels in the ``fit_shape`` region were masked - 2 : the fit x and/or y position lies outside of the input data - 4 : the fit flux is less than or equal to zero - 8 : the fitter may not have converged. In this case, you can try increasing the maximum number of fit iterations using the ``fitter_maxiters`` keyword. - 16 : the fitter parameter covariance matrix was not returned - 32 : the fit x or y position is at the bounded value """ if isinstance(data, NDData): data_ = data.data if data.unit is not None: data_ <<= data.unit mask = data.mask unc = data.uncertainty if unc is not None: error = unc.represent_as(StdDevUncertainty).quantity if error.unit is u.dimensionless_unscaled: error = error.value else: error = error.to(data.unit) return self.__call__(data_, mask=mask, error=error, init_params=init_params) # reset results from previous runs self._reset_results() # Prepare fit inputs, including defining the initial source # parameters. This also runs the source finder, aperture # photometry, local background estimator, and source grouper, if # needed. fit_inputs = self._prepare_fit_inputs(data, mask=mask, error=error, init_params=init_params) if fit_inputs is None: return None # fit the sources data, mask, error, init_params = fit_inputs fit_params = self._fit_sources(data, init_params, error=error, mask=mask) # stack initial and fit params to create output table results_tbl = join(init_params, fit_params) npixfit = np.array(self._ungroup(self._group_results['npixfit'])) results_tbl['npixfit'] = npixfit nmodels = np.array(self._ungroup(self._group_results['nmodels'])) index = results_tbl.index_column('group_id') + 1 results_tbl.add_column(nmodels, name='group_size', index=index) qfit, cfit = self._calc_fit_metrics(results_tbl) results_tbl['qfit'] = qfit results_tbl['cfit'] = cfit results_tbl['flags'] = self._define_flags(results_tbl, data.shape) meta = _get_meta() attrs = ('fit_shape', 'fitter_maxiters', 'aperture_radius', 'progress_bar') for attr in attrs: meta[attr] = getattr(self, attr) results_tbl.meta = meta if len(self.fit_info['fit_error_indices']) > 0: warnings.warn('One or more fit(s) may not have converged. Please ' 'check the "flags" column in the output table.', AstropyUserWarning) # convert results from defaultdict to dict self.fit_info = dict(self.fit_info) self.results = results_tbl return results_tbl
[docs] def make_model_image(self, shape, *, psf_shape=None, include_localbkg=False): return ModelImageMixin.make_model_image( self, shape, psf_shape=psf_shape, include_localbkg=include_localbkg)
[docs] def make_residual_image(self, data, *, psf_shape=None, include_localbkg=False): return ModelImageMixin.make_residual_image( self, data, psf_shape=psf_shape, include_localbkg=include_localbkg)
[docs] class IterativePSFPhotometry(ModelImageMixin): """ Class to iteratively perform PSF photometry. This is a convenience class that iteratively calls the `PSFPhotometry` class to perform PSF photometry on an input image. It can be useful for crowded fields where faint stars are very close to bright stars and are not detected in the first pass of PSF photometry. For complex cases, one may need to manually run `PSFPhotometry` in an iterative manner and inspect the residual image after each iteration. Parameters ---------- psf_model : 2D `astropy.modeling.Model` The PSF model to fit to the data. The model must have parameters named ``x_0``, ``y_0``, and ``flux``, corresponding to the center (x, y) position and flux, or it must have 'x_name', 'y_name', and 'flux_name' attributes that map to the x, y, and flux parameters (i.e., a model output from `make_psf_model`). The model must be two-dimensional such that it accepts 2 inputs (e.g., x and y) and provides 1 output. fit_shape : int or length-2 array_like The rectangular shape around the center of a star that will be used to define the PSF-fitting data. If ``fit_shape`` is a scalar then a square shape of size ``fit_shape`` will be used. If ``fit_shape`` has two elements, they must be in ``(ny, nx)`` order. Each element of ``fit_shape`` must be an odd number. In general, ``fit_shape`` should be set to a small size (e.g., ``(5, 5)``) that covers the region with the highest flux signal-to-noise. finder : callable or `~photutils.detection.StarFinderBase` A callable used to identify stars in an image. The ``finder`` must accept a 2D image as input and return a `~astropy.table.Table` containing the x and y centroid positions. These positions are used as the starting points for the PSF fitting. The allowed ``x`` column names are (same suffix for ``y``): ``'x_init'``, ``'xinit'``, ``'x'``, ``'x_0'``, ``'x0'``, ``'xcentroid'``, ``'x_centroid'``, ``'x_peak'``, ``'xcen'``, ``'x_cen'``, ``'xpos'``, ``'x_pos'``, ``'x_fit'``, and ``'xfit'``. If `None`, then the initial (x, y) model positions must be input using the ``init_params`` keyword when calling the class. The (x, y) values in ``init_params`` override this keyword *only for the first iteration*. If this class is run on an image that has units (i.e., a `~astropy.units.Quantity` array), then certain ``finder`` keywords (e.g., ``threshold``) must have the same units. Please see the documentation for the specific ``finder`` class for more information. grouper : `~photutils.psf.SourceGrouper` or callable or `None`, optional A callable used to group stars. Typically, grouped stars are those that overlap with their neighbors. Stars that are grouped are fit simultaneously. The ``grouper`` must accept the x and y coordinates of the sources and return an integer array of the group id numbers (starting from 1) indicating the group in which a given source belongs. If `None`, then no grouping is performed, i.e. each source is fit independently. The ``group_id`` values in ``init_params`` override this keyword *only for the first iteration*. A warning is raised if any group size is larger than 25 sources. fitter : `~astropy.modeling.fitting.Fitter`, optional The fitter object used to perform the fit of the model to the data. fitter_maxiters : int, optional The maximum number of iterations in which the ``fitter`` is called for each source. xy_bounds : `None`, float, or 2-tuple of float, optional The maximum distance in pixels that a fitted source can be from the initial (x, y) position. If a single float, then the same maximum distance is used for both x and y. If a 2-tuple of floats, then the distances are in ``(x, y)`` order. If `None`, then no bounds are applied. Either value can also be `None` to indicate no bound in that direction. maxiters : int, optional The maximum number of PSF-fitting/subtraction iterations to perform. mode : {'new', 'all'}, optional For the 'new' mode, `PSFPhotometry` is run in each iteration only on the new sources detected in the residual image. For the 'all' mode, `PSFPhotometry` is run in each iteration on all the detected sources (from all previous iterations) on the original, unsubtracted, data. For the 'all' mode, a source ``grouper`` must be input. See the Notes section for more details. localbkg_estimator : `~photutils.background.LocalBackground` or `None`, \ optional The object used to estimate the local background around each source. If `None`, then no local background is subtracted. The ``local_bkg`` values in ``init_params`` override this keyword. This option should be used with care, especially in crowded fields where the ``fit_shape`` of sources overlap (see Notes below). aperture_radius : float, optional The radius of the circular aperture used to estimate the initial flux of each source. If initial flux values are present in the ``init_params`` table, they will override this keyword *only for the first iteration*. sub_shape : `None`, int, or length-2 array_like The rectangular shape around the center of a star that will be used when subtracting the fitted PSF models. If ``sub_shape`` is a scalar then a square shape of size ``sub_shape`` will be used. If ``sub_shape`` has two elements, they must be in ``(ny, nx)`` order. Each element of ``sub_shape`` must be an odd number. If `None`, then ``sub_shape`` will be defined by the model bounding box. This keyword must be specified if the model does not have a ``bounding_box`` attribute. progress_bar : bool, optional Whether to display a progress bar when fitting the sources (or groups). The progress bar requires that the `tqdm <https://tqdm.github.io/>`_ optional dependency be installed. Note that the progress bar does not currently work in the Jupyter console due to limitations in ``tqdm``. Notes ----- The data that will be fit for each source is defined by the ``fit_shape`` parameter. A cutout will be made around the initial center of each source with a shape defined by ``fit_shape``. The PSF model will be fit to the data in this region. The cutout region that is fit does not shift if the source center shifts during the fit iterations. Therefore, the initial source positions should be close to the true source positions. One way to ensure this is to use a ``finder`` to identify sources in the data. If the fitted positions are significantly different from the initial positions, one can rerun the `PSFPhotometry` class using the fit results as the input ``init_params``, which will change the fitted cutout region for each source. After calling `PSFPhotometry` on the data, it will have a ``fit_params`` attribute containing the fitted model parameters. This table can be used as the ``init_params`` input in a subsequent call to `PSFPhotometry`. If the returned model parameter errors are NaN, then either the fit did not converge, the model parameter was fixed, or the input ``fitter`` did not return parameter errors. For the later case, one can try a different fitter that may return parameter errors (e.g., `astropy.modeling.fitting.DogBoxLSQFitter` or `astropy.modeling.fitting.LMLSQFitter`). The local background value around each source is optionally estimated using the ``localbkg_estimator`` or obtained from the ``local_bkg`` column in the input ``init_params`` table. This local background is then subtracted from the data over the ``fit_shape`` region for each source before fitting the PSF model. For sources where their ``fit_shape`` regions overlap, the local background will effectively be subtracted twice in the overlapping ``fit_shape`` regions, even if the source ``grouper`` is input. This is not an issue if the sources are well-separated. However, for crowded fields, please use the ``localbkg_estimator`` (or ``local_bkg`` column in ``init_params``) with care. This class has two modes of operation: 'new' and 'all'. For both modes, `PSFPhotometry` is first run on the data, a residual image is created, and the source finder is run on the residual image to detect any new sources. In the 'new' mode, `PSFPhotometry` is then run on the residual image to fit the PSF model to the new sources. The process is repeated until no new sources are detected or a maximum number of iterations is reached. In the 'all' mode, a new source list combining the sources from first `PSFPhotometry` run and the new sources detected in the residual image is created. `PSFPhotometry` is then run on the original, unsubtracted, data with this combined source list. This allows the source ``grouper`` (which is required for the 'all' mode) to combine close sources to be fit simultaneously, improving the fit. Again, the process is repeated until no new sources are detected or a maximum number of iterations is reached. Care should be taken in defining the star groups. Simultaneously fitting very large star groups is computationally expensive and error-prone. Internally, source grouping requires the creation of a compound Astropy model. Due to the way compound Astropy models are currently constructed, large groups also require excessively large amounts of memory; this will hopefully be fixed in a future Astropy version. A warning will be raised if the number of sources in a group exceeds 25. """ def __init__(self, psf_model, fit_shape, finder, *, grouper=None, fitter=TRFLSQFitter(), fitter_maxiters=100, xy_bounds=None, maxiters=3, mode='new', localbkg_estimator=None, aperture_radius=None, sub_shape=None, progress_bar=False): if finder is None: raise ValueError('finder cannot be None for ' 'IterativePSFPhotometry.') if aperture_radius is None: raise ValueError('aperture_radius cannot be None for ' 'IterativePSFPhotometry.') self._psfphot = PSFPhotometry(psf_model, fit_shape, finder=finder, grouper=grouper, fitter=fitter, fitter_maxiters=fitter_maxiters, xy_bounds=xy_bounds, localbkg_estimator=localbkg_estimator, aperture_radius=aperture_radius, progress_bar=progress_bar) self.maxiters = self._validate_maxiters(maxiters) if mode not in ['new', 'all']: raise ValueError('mode must be "new" or "all".') if mode == 'all' and grouper is None: raise ValueError('grouper must be input for the "all" mode.') self.mode = mode self.sub_shape = sub_shape self.fit_results = [] def _reset_results(self): """ Reset these attributes for each __call__. """ self.fit_results = [] @staticmethod def _validate_maxiters(maxiters): if (not np.isscalar(maxiters) or maxiters <= 0 or ~np.isfinite(maxiters)): raise ValueError('maxiters must be a strictly-positive scalar') if maxiters != int(maxiters): raise ValueError('maxiters must be an integer') return maxiters @staticmethod def _emit_warnings(recorded_warnings): """ Emit unique warnings from a list of recorded warnings. Parameters ---------- recorded_warnings : list A list of recorded warnings. """ msgs = [] emit_warnings = [] for warning in recorded_warnings: if str(warning.message) not in msgs: msgs.append(str(warning.message)) emit_warnings.append(warning) for warning in emit_warnings: warnings.warn_explicit(warning.message, warning.category, warning.filename, warning.lineno) def _convert_finder_to_init(self, sources): """ Convert the output of the finder to a table with initial (x, y) position column names. """ xcol = self._psfphot._param_maps['init_cols']['x'] ycol = self._psfphot._param_maps['init_cols']['y'] sources = sources[('xcentroid', 'ycentroid')] sources.rename_column('xcentroid', xcol) sources.rename_column('ycentroid', ycol) return sources def _measure_init_fluxes(self, data, mask, sources): """ Measure initial fluxes for the new sources from the residual data. The fluxes are added in place to the input ``sources`` table. The fluxes are measured using aperture photometry with a circular aperture of radius ``aperture_radius``. Parameters ---------- data : 2D `~numpy.ndarray` The 2D array on which to perform photometry. mask : 2D bool `~numpy.ndarray` A boolean mask with the same shape as ``data``, where a `True` value indicates the corresponding element of ``data`` is masked. sources : `~astropy.table.Table` A table containing the initial (x, y) positions of the sources. Returns ------- sources : `~astropy.table.Table` The input ``sources`` table with the new flux column added. """ flux = self._psfphot._get_aper_fluxes(data, mask, sources) unit = getattr(data, 'unit', None) if unit is not None: flux <<= unit fluxcol = self._psfphot._param_maps['init_cols']['flux'] sources[fluxcol] = flux return sources def _create_init_params(self, data, mask, new_sources, orig_sources): """ Create the initial parameters table by combining the original and new sources. """ # rename the columns from the fit results init_params = self._psfphot._rename_init_columns( orig_sources, self._psfphot._param_maps, self._psfphot._find_column_name) for colname in init_params.colnames: if '_init' not in colname: init_params.remove_column(colname) # add initial fluxes for the new sources from the residual data new_sources = self._measure_init_fluxes(data, mask, new_sources) # add columns for any additional parameters that are fit for param_name, colname in self._psfphot._param_maps['init'].items(): if colname not in new_sources.colnames: new_sources[colname] = getattr(self._psfphot.psf_model, param_name) # combine original and new source tables new_sources.meta.pop('date', None) # prevent merge conflicts return vstack([orig_sources, new_sources])
[docs] def __call__(self, data, *, mask=None, error=None, init_params=None): """ Perform PSF photometry. Parameters ---------- data : 2D `~numpy.ndarray` The 2D array on which to perform photometry. Invalid data values (i.e., NaN or inf) are automatically masked. mask : 2D bool `~numpy.ndarray`, optional A boolean mask with the same shape as ``data``, where a `True` value indicates the corresponding element of ``data`` is masked. error : 2D `~numpy.ndarray`, optional The pixel-wise 1-sigma errors of the input ``data``. ``error`` is assumed to include *all* sources of error, including the Poisson error of the sources (see `~photutils.utils.calc_total_error`) . ``error`` must have the same shape as the input ``data``. If ``data`` is a `~astropy.units.Quantity` array, then ``error`` must also be a `~astropy.units.Quantity` array with the same units. init_params : `~astropy.table.Table` or `None`, optional A table containing the initial guesses of the model parameters (e.g., x, y, flux) for each source *only for the first iteration*. If the x and y values are not input, then the ``finder`` will be used for all iterations. If the flux values are not input, then the initial fluxes will be measured using the ``aperture_radius`` keyword. Note that the initial flux values refer to the model flux parameters and are not corrected for local background values (computed using ``localbkg_estimator`` or input in a ``local_bkg`` column) The allowed column names are: * ``x_init``, ``xinit``, ``x``, ``x_0``, ``x0``, ``xcentroid``, ``x_centroid``, ``x_peak``, ``xcen``, ``x_cen``, ``xpos``, ``x_pos``, ``x_fit``, and ``xfit``. * ``y_init``, ``yinit``, ``y``, ``y_0``, ``y0``, ``ycentroid``, ``y_centroid``, ``y_peak``, ``ycen``, ``y_cen``, ``ypos``, ``y_pos``, ``y_fit``, and ``yfit``. * ``flux_init``, ``fluxinit``, ``flux``, ``flux_0``, ``flux0``, ``flux_fit``, ``fluxfit``, ``source_sum``, ``segment_flux``, and ``kron_flux``. * If the PSF model has additional free parameters that are fit, they can be included in the table. The column names must match the parameter names in the PSF model. They can also be suffixed with either the "_init" or "_fit" suffix. The suffix search order is "_init", "" (no suffix), and "_fit". For example, if the PSF model has an additional parameter named "sigma", then the allowed column names are: "sigma_init", "sigma", and "sigma_fit". If the column name is not found in the table, then the default value from the PSF model will be used. The default values from the PSF model will also be used for all iterations after the first. The parameter names are searched in the input table in the above order, stopping at the first match. If ``data`` is a `~astropy.units.Quantity` array, then the initial flux values in this table must also must also have compatible units. Returns ------- table : `~astropy.table.QTable` An astropy table with the PSF-fitting results. The table will contain the following columns: * ``id`` : unique identification number for the source * ``group_id`` : unique identification number for the source group * ``group_size`` : the total number of sources that were simultaneously fit along with the given source * ``iter_detected`` : the iteration number in which the source was detected * ``x_init``, ``x_fit``, ``x_err`` : the initial, fit, and error of the source x center * ``y_init``, ``y_fit``, ``y_err`` : the initial, fit, and error of the source y center * ``flux_init``, ``flux_fit``, ``flux_err`` : the initial, fit, and error of the source flux * ``npixfit`` : the number of unmasked pixels used to fit the source * ``qfit`` : a quality-of-fit metric defined as the absolute value of the sum of the fit residuals divided by the fit flux * ``cfit`` : a quality-of-fit metric defined as the fit residual in the initial central pixel value divided by the fit flux. NaN values indicate that the central pixel was masked. * ``flags`` : bitwise flag values - 1 : one or more pixels in the ``fit_shape`` region were masked - 2 : the fit x and/or y position lies outside of the input data - 4 : the fit flux is less than or equal to zero - 8 : the fitter may not have converged. In this case, you can try increasing the maximum number of fit iterations using the ``fitter_maxiters`` keyword. - 16 : the fitter parameter covariance matrix was not returned - 32 : the fit x or y position is at the bounded value """ if isinstance(data, NDData): data_ = data.data if data.unit is not None: data_ <<= data.unit mask = data.mask unc = data.uncertainty if unc is not None: error = unc.represent_as(StdDevUncertainty).quantity if error.unit is u.dimensionless_unscaled: error = error.value else: error = error.to(data.unit) return self.__call__(data_, mask=mask, error=error, init_params=init_params) # reset results from previous runs self._reset_results() with warnings.catch_warnings(record=True) as rwarn0: phot_tbl = self._psfphot(data, mask=mask, error=error, init_params=init_params) self.fit_results.append(deepcopy(self._psfphot)) # this needs to be run outside of the context manager to be able # to reemit any warnings if phot_tbl is None: self._emit_warnings(rwarn0) return None residual_data = data with warnings.catch_warnings(record=True) as rwarn1: phot_tbl['iter_detected'] = 1 if self.mode == 'all': iter_detected = np.ones(len(phot_tbl), dtype=int) iter_num = 2 while iter_num <= self.maxiters and phot_tbl is not None: residual_data = self._psfphot.make_residual_image( residual_data, psf_shape=self.sub_shape) # do not warn if no sources are found beyond the first # iteration with warnings.catch_warnings(): warnings.simplefilter('ignore', NoDetectionsWarning) new_sources = self._psfphot.finder(residual_data, mask=mask) if new_sources is None: # no new sources detected break finder_results = new_sources.copy() new_sources = self._convert_finder_to_init(new_sources) if self.mode == 'all': init_params = self._create_init_params( residual_data, mask, new_sources, self._psfphot.fit_params) residual_data = data # keep track of the iteration number in which the source # was detected current_iter = (np.ones(len(new_sources), dtype=int) * iter_num) iter_detected = np.concatenate((iter_detected, current_iter)) elif self.mode == 'new': # fit new sources on the residual data init_params = new_sources # remove any sources that do not overlap the data imask = self._psfphot._get_invalid_positions(init_params, data.shape) init_params = init_params[~imask] if self.mode == 'all': iter_detected = iter_detected[~imask] new_tbl = self._psfphot(residual_data, mask=mask, error=error, init_params=init_params) self._psfphot.finder_results = finder_results self.fit_results.append(deepcopy(self._psfphot)) if self.mode == 'all': new_tbl['iter_detected'] = iter_detected phot_tbl = new_tbl elif self.mode == 'new': # combine tables new_tbl['id'] += np.max(phot_tbl['id']) new_tbl['group_id'] += np.max(phot_tbl['group_id']) new_tbl['iter_detected'] = iter_num new_tbl.meta = {} # prevent merge conflicts on date phot_tbl = vstack([phot_tbl, new_tbl]) iter_num += 1 # move 'iter_detected' column phot_tbl = self._psfphot._move_column(phot_tbl, 'iter_detected', 'group_size') # emit unique warnings recorded_warnings = rwarn0 + rwarn1 self._emit_warnings(recorded_warnings) return phot_tbl
[docs] def make_model_image(self, shape, *, psf_shape=None, include_localbkg=False): return ModelImageMixin.make_model_image( self, shape, psf_shape=psf_shape, include_localbkg=include_localbkg)
[docs] def make_residual_image(self, data, *, psf_shape=None, include_localbkg=False): return ModelImageMixin.make_residual_image( self, data, psf_shape=psf_shape, include_localbkg=include_localbkg)
def _flatten(iterable): """ Flatten a list of lists. """ return list(chain.from_iterable(iterable))