# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
This module provides classes to perform PSF-fitting photometry.
"""
import inspect
import warnings
from collections import defaultdict
from copy import deepcopy
from itertools import chain
import astropy.units as u
import numpy as np
from astropy.modeling import Model
from astropy.modeling.fitting import LevMarLSQFitter
from astropy.nddata import (NDData, NoOverlapError, StdDevUncertainty,
overlap_slices)
from astropy.table import QTable, Table, hstack, vstack
from astropy.utils import lazyproperty
from astropy.utils.exceptions import AstropyUserWarning
from photutils.aperture import CircularAperture
from photutils.background import LocalBackground
from photutils.psf.groupstars import GroupStarsBase
from photutils.utils._misc import _get_meta
from photutils.utils._parameters import as_pair
from photutils.utils._progress_bars import add_progress_bar
from photutils.utils._quantity_helpers import process_quantities
from photutils.utils._round import py2intround
from photutils.utils.exceptions import NoDetectionsWarning
__all__ = ['PSFPhotometry', 'IterativePSFPhotometry']
[docs]
class PSFPhotometry:
"""
Class to perform PSF photometry.
This class implements a flexible PSF photometry algorithm that can
find sources in an image, group overlapping sources, fit the PSF
model to the sources, and subtract the fit PSF models from the
image.
Parameters
----------
psf_model : `astropy.modeling.Fittable2DModel`
The PSF model to fit to the data. The model must have parameters
named ``x_0``, ``y_0``, and ``flux``, corresponding to the
center (x, y) position and flux, or it must have 'x_name',
'y_name', and 'flux_name' attributes that map to the x, y, and
flux parameters (i.e., a model output from `prepare_psf_model`).
fit_shape : int or length-2 array_like
The rectangular shape around the center of a star that will
be used to define the PSF-fitting data. If ``fit_shape`` is a
scalar then a square shape of size ``fit_shape`` will be used.
If ``fit_shape`` has two elements, they must be in ``(ny,
nx)`` order. Each element of ``fit_shape`` must be an odd
number. In general, ``fit_shape`` should be set to a small size
(e.g., ``(5, 5)``) that covers the region with the highest flux
signal-to-noise.
finder : callable or `~photutils.detection.StarFinderBase` or `None`, optional
A callable used to identify stars in an image. The
``finder`` must accept a 2D image as input and return a
`~astropy.table.Table` containing the x and y centroid
positions. These positions are used as the starting points
for the PSF fitting. The allowed ``x`` column names are (same
suffix for ``y``): ``'x_init'``, ``'xinit'``, ``'xcentroid'``,
``'x_centroid'``, ``'x_peak'``, ``'x'``, ``'xcen'``,
``'x_cen'``, ``'xpos'``, ``'x_pos'``, ``'x_0'``, and ``'x0'``.
If `None`, then the initial (x, y) model positions must be input
using the ``init_params`` keyword when calling the class. The
(x, y) values in ``init_params`` override this keyword.
grouper : `~photutils.psf.SourceGrouper` or callable or `None`, optional
A callable used to group stars. Typically, grouped stars are
those that overlap with their neighbors. Stars that are grouped
are fit simultaneously. The ``grouper`` must accept the x and
y coordinates of the sources and return an integer array of
the group id numbers (starting from 1) indicating the group
in which a given source belongs. If `None`, then no grouping
is performed, i.e. each source is fit independently. The
``group_id`` values in ``init_params`` override this keyword
*only for the first iteration*. A warning is raised if any group
size is larger than 25 sources.
fitter : `~astropy.modeling.fitting.Fitter`, optional
The fitter object used to perform the fit of the model to the
data.
fitter_maxiters : int, optional
The maximum number of iterations in which the ``fitter`` is
called for each source.
localbkg_estimator : `~photutils.background.LocalBackground` or `None`, optional
The object used to estimate the local background around each
source. If `None`, then no local background is subtracted. The
``local_bkg`` values in ``init_params`` override this keyword.
aperture_radius : float, optional
The radius of the circular aperture used to estimate the initial
flux of each source. The ``flux_init`` values in ``init_params``
override this keyword.
progress_bar : bool, optional
Whether to display a progress bar when fitting the sources
(or groups). The progress bar requires that the `tqdm
<https://tqdm.github.io/>`_ optional dependency be installed.
Note that the progress bar does not currently work in the
Jupyter console due to limitations in ``tqdm``.
"""
def __init__(self, psf_model, fit_shape, *, finder=None, grouper=None,
fitter=LevMarLSQFitter(), fitter_maxiters=100,
localbkg_estimator=None, aperture_radius=None,
progress_bar=False):
self.psf_model = psf_model
self._validate_psf_model()
self.fit_shape = as_pair('fit_shape', fit_shape, lower_bound=(0, 1),
check_odd=True)
self.grouper = self._validate_grouper(grouper, 'grouper')
self.finder = self._validate_callable(finder, 'finder')
self.fitter = self._validate_callable(fitter, 'fitter')
self.localbkg_estimator = self._validate_localbkg(
localbkg_estimator, 'localbkg_estimator')
self.fitter_maxiters = self._validate_maxiters(fitter_maxiters)
self.aperture_radius = self._validate_radius(aperture_radius)
self.progress_bar = progress_bar
# reset these attributes for each __call__ (see _reset_results)
self.finder_results = None
self.fit_results = defaultdict(list)
self._group_results = defaultdict(list)
self._fit_models = None
def _reset_results(self):
self.finder_results = None
self.fit_results = defaultdict(list)
self._group_results = defaultdict(list)
self._fit_models = None
def _validate_grouper(self, grouper, name):
# remove this check when GroupStarsBase subclasses are removed
if isinstance(grouper, GroupStarsBase):
raise ValueError('Invalid grouper class. Please use '
'SourceGrouper.')
return self._validate_callable(grouper, name)
@lazyproperty
def _psf_param_names(self):
"""
The PSF model parameters corresponding to x, y, and flux.
"""
params1 = ('x_0', 'y_0', 'flux')
params2 = ('x_name', 'y_name', 'flux_name')
params3 = ('xname', 'yname', 'fluxname') # deprecated
if all(name in self.psf_model.param_names for name in params1):
model_params = params1
elif all(hasattr(self.psf_model, name) for name in params2):
model_params = []
for name in params2:
model_params.append(getattr(self.psf_model, name))
model_params = tuple(model_params)
elif all(hasattr(self.psf_model, name) for name in params3):
model_params = []
for name in params3:
model_params.append(getattr(self.psf_model, name))
model_params = tuple(model_params)
else:
msg = 'Invalid PSF model - could not find PSF parameter names.'
raise ValueError(msg)
return model_params
@lazyproperty
def _fitted_psf_param_names(self):
"""
All PSF model parameters that are fit.
"""
fitted_params = []
for key, val in self.psf_model.fixed.items():
if not val:
fitted_params.append(key)
return fitted_params
@lazyproperty
def _extra_psf_param_names(self):
"""
PSF model parameters that are fit, but do not correspond to x,
y, or flux.
"""
extra_params = []
for key in self._fitted_psf_param_names:
if key not in self._psf_param_names:
extra_params.append(key)
return extra_params
def _validate_psf_model(self):
"""
Validate the input PSF model.
The PSF model must have parameters called 'x_0', 'y_0', and
'flux' or it must have 'x_name', 'y_name', and 'flux_name'
attributes (i.e., output from `prepare_psf_model`). Otherwise, a
`ValueError` is raised.
"""
if not isinstance(self.psf_model, Model):
raise TypeError('psf_model must be an Astropy Model subclass.')
_ = self._psf_param_names
@staticmethod
def _validate_callable(obj, name):
if obj is not None and not callable(obj):
raise TypeError(f'{name!r} must be a callable object')
return obj
def _validate_localbkg(self, value, name):
if value is not None and not isinstance(value, LocalBackground):
raise ValueError('localbkg_estimator must be a '
'LocalBackground instance.')
return self._validate_callable(value, name)
def _validate_maxiters(self, maxiters):
spec = inspect.signature(self.fitter.__call__)
if 'maxiter' not in spec.parameters:
warnings.warn('"maxiters" will be ignored because it is not '
'accepted by the input fitter __call__ method',
AstropyUserWarning)
maxiters = None
return maxiters
@staticmethod
def _validate_radius(radius):
if radius is not None and (not np.isscalar(radius)
or radius <= 0 or ~np.isfinite(radius)):
raise ValueError('aperture_radius must be a strictly-positive '
'scalar')
return radius
def _validate_array(self, array, name, data_shape=None):
if name == 'mask' and array is np.ma.nomask:
array = None
if array is not None:
array = np.asanyarray(array)
if array.ndim != 2:
raise ValueError(f'{name} must be a 2D array.')
if data_shape is not None and array.shape != data_shape:
raise ValueError(f'data and {name} must have the same shape.')
return array
@lazyproperty
def _init_colnames(self):
"""
A dictionary of column names for the initial x, y, and flux values
reported in the output table.
"""
init_colnames = {}
init_colnames['x'] = 'x_init'
init_colnames['y'] = 'y_init'
init_colnames['flux'] = 'flux_init'
init_colnames['suffix'] = '_init'
return init_colnames
@lazyproperty
def _valid_colnames(self):
"""
A dictionary of valid column names for the input ``init_params``
table.
These lists are searched in order.
"""
xy_suffixes = ('_init', 'init', 'centroid', '_centroid', '_peak', '',
'cen', '_cen', 'pos', '_pos', '_0', '0')
x_valid = ['x' + i for i in xy_suffixes]
y_valid = ['y' + i for i in xy_suffixes]
valid_colnames = {}
valid_colnames['x'] = x_valid
valid_colnames['y'] = y_valid
valid_colnames['flux'] = ('flux_init', 'flux_0', 'flux0', 'flux',
'source_sum', 'segment_flux', 'kron_flux')
return valid_colnames
def _find_column_name(self, key, colnames):
name = ''
valid_names = self._valid_colnames[key]
for valid_name in valid_names:
if valid_name in colnames:
name = valid_name
return name
def _validate_init_params(self, init_params, flux_unit):
if init_params is None:
return init_params
if not isinstance(init_params, Table):
raise TypeError('init_params must be an astropy Table')
xcolname = self._find_column_name('x', init_params.colnames)
ycolname = self._find_column_name('y', init_params.colnames)
if not xcolname or not ycolname:
raise ValueError('init_param must contain valid column names '
'for the x and y source positions')
init_params = init_params.copy() # preserve input init_params
xinit_name = self._init_colnames['x']
yinit_name = self._init_colnames['y']
if xcolname != xinit_name:
init_params.rename_column(xcolname, xinit_name)
if ycolname != yinit_name:
init_params.rename_column(ycolname, yinit_name)
fluxcolname = self._find_column_name('flux', init_params.colnames)
if fluxcolname:
fluxinit_name = self._init_colnames['flux']
if fluxcolname != fluxinit_name:
init_params.rename_column(fluxcolname, fluxinit_name)
init_flux = init_params[fluxinit_name]
if isinstance(init_flux, u.Quantity):
if flux_unit is None:
raise ValueError('init_params flux column has '
'units, but the input data does not '
'have units.')
try:
init_params[fluxinit_name] = init_flux.to(flux_unit)
except u.UnitConversionError as exc:
raise ValueError('init_params flux column has '
'units that are incompatible with '
'the input data units.') from exc
else:
if flux_unit is not None:
raise ValueError('The input data has units, but the '
'init_params flux column does not have '
'units.')
return init_params
@staticmethod
def _make_mask(image, mask):
def warn_nonfinite():
warnings.warn('Input data contains unmasked non-finite values '
'(NaN or inf), which were automatically ignored.',
AstropyUserWarning)
# if NaNs are in the data, no actual fitting takes place
# https://github.com/astropy/astropy/pull/12811
finite_mask = ~np.isfinite(image)
if mask is not None:
finite_mask |= mask
if np.any(finite_mask & ~mask):
warn_nonfinite()
else:
mask = finite_mask
if np.any(finite_mask):
warn_nonfinite()
else:
mask = None
return mask
def _get_aper_fluxes(self, data, mask, init_params):
xpos = init_params[self._init_colnames['x']]
ypos = init_params[self._init_colnames['y']]
apertures = CircularAperture(zip(xpos, ypos), r=self.aperture_radius)
flux, _ = apertures.do_photometry(data, mask=mask)
return flux
def _prepare_init_params(self, data, unit, mask, init_params):
if init_params is None:
if self.finder is None:
raise ValueError('finder must be defined if init_params '
'is not input')
sources = self.finder(data, mask=mask)
self.finder_results = sources
if sources is None:
return None
init_params = QTable()
init_params['id'] = np.arange(len(sources)) + 1
init_params[self._init_colnames['x']] = sources['xcentroid']
init_params[self._init_colnames['y']] = sources['ycentroid']
else:
colnames = init_params.colnames
if 'id' not in colnames:
init_params['id'] = np.arange(len(init_params)) + 1
if 'group_id' in colnames:
# grouper is ignored if group_id is input in init_params
self.grouper = None
if 'local_bkg' not in init_params.colnames:
if self.localbkg_estimator is None:
local_bkg = np.zeros(len(init_params))
else:
local_bkg = self.localbkg_estimator(
data, init_params[self._init_colnames['x']],
init_params[self._init_colnames['y']], mask=mask)
init_params['local_bkg'] = local_bkg
self.fit_results['local_bkg'] = init_params['local_bkg'].value
if self._init_colnames['flux'] not in init_params.colnames:
flux = self._get_aper_fluxes(data, mask, init_params)
flux -= init_params['local_bkg']
if unit is not None:
flux <<= unit
init_params[self._init_colnames['flux']] = flux
if self.grouper is not None:
init_params['group_id'] = self.grouper(
init_params['x_init'], init_params['y_init'])
# no grouping
if 'group_id' not in init_params.colnames:
init_params['group_id'] = init_params['id']
extra_param_cols = []
init_param_map = self._param_maps['init']
for extra_param in self._extra_psf_param_names:
for key, val in init_param_map.items():
if val == extra_param:
extra_param_cols.append(key)
for extra_col in extra_param_cols:
if extra_col not in init_params.colnames:
init_params[extra_col] = getattr(self.psf_model,
init_param_map[extra_col])
# order init_params columns
colname_order = ['id', 'group_id', 'local_bkg',
self._init_colnames['x'], self._init_colnames['y'],
self._init_colnames['flux']]
colname_order.extend(extra_param_cols)
init_params = init_params[colname_order]
return init_params
@lazyproperty
def _param_maps(self):
"""
Map x, y, and flux column names to the PSF model parameter names.
Also include any extra PSF model parameters that are fit, but do
not correspond to x, y, or flux.
The column names include the ``_init``, ``_fit``, and ``_err``
suffixes for each parameter.
"""
init_param_map = {}
init_param_map[self._init_colnames['x']] = self._psf_param_names[0]
init_param_map[self._init_colnames['y']] = self._psf_param_names[1]
init_param_map[self._init_colnames['flux']] = self._psf_param_names[2]
for extra_param in self._extra_psf_param_names:
init_param_map[f'{extra_param}_init'] = extra_param
init_suffix = self._init_colnames['suffix']
fit_param_map = {val: key.replace(init_suffix, '_fit')
for key, val in init_param_map.items()}
err_param_map = {val: key.replace(init_suffix, '_err')
for key, val in init_param_map.items()}
param_maps = {}
param_maps['init'] = init_param_map
param_maps['fit'] = fit_param_map
param_maps['err'] = err_param_map
return param_maps
def _make_psf_model(self, sources):
"""
Make a PSF model to fit a single source or several sources within
a group.
"""
init_param_map = self._param_maps['init']
for index, source in enumerate(sources):
model = self.psf_model.copy()
for param, model_param in init_param_map.items():
value = source[param]
if isinstance(value, u.Quantity):
value = value.value # psf model cannot be fit with units
setattr(model, model_param, value)
model.name = source['id']
if index == 0:
psf_model = model
else:
psf_model += model
return psf_model
def _define_fit_data(self, sources, data, mask):
yi = []
xi = []
cutout = []
npixfit = []
cen_index = []
for row in sources:
xcen = py2intround(row[self._init_colnames['x']])
ycen = py2intround(row[self._init_colnames['y']])
try:
slc_lg, _ = overlap_slices(data.shape, self.fit_shape,
(ycen, xcen), mode='trim')
except NoOverlapError as exc:
msg = (f'Initial source at ({xcen}, {ycen}) does not '
'overlap with the input data.')
raise ValueError(msg) from exc
yy, xx = np.mgrid[slc_lg]
if mask is not None:
inv_mask = ~mask[yy, xx]
if np.count_nonzero(inv_mask) == 0:
msg = (f'Source at ({xcen}, {ycen}) is completely masked. '
'Remove the source from init_params or correct '
'the input mask.')
raise ValueError(msg)
yy = yy[inv_mask]
xx = xx[inv_mask]
else:
xx = xx.ravel()
yy = yy.ravel()
xi.append(xx)
yi.append(yy)
cutout.append(data[yy, xx] - row['local_bkg'])
npixfit.append(len(xx))
idx = np.where((xx == xcen) & (yy == ycen))[0]
if len(idx) == 0:
idx = [np.nan]
cen_index.append(idx[0])
# flatten the lists, which may contain arrays of different lengths
# due to masking
xi = _flatten(xi)
yi = _flatten(yi)
cutout = _flatten(cutout)
self._group_results['npixfit'].append(npixfit)
self._group_results['psfcenter_indices'].append(cen_index)
return yi, xi, cutout
@staticmethod
def _split_compound_model(model, chunk_size):
for i in range(0, model.n_submodels, chunk_size):
yield model[i:i + chunk_size]
@staticmethod
def _split_param_errs(param_err, nparam):
for i in range(0, len(param_err), nparam):
yield param_err[i:i + nparam]
def _order_by_id(self, iterable):
"""
Reorder the list from group-id to source-id order.
"""
return [iterable[i] for i in self._group_results['ungroup_indices']]
def _ungroup(self, iterable):
"""
Expand a list of lists (groups) and reorder in source-id order.
"""
iterable = _flatten(iterable)
return self._order_by_id(iterable)
def _get_fit_error_indices(self):
indices = []
for index, fit_info in enumerate(self.fit_results['fit_infos']):
ierr = fit_info.get('ierr', None)
# check if in good flags defined by scipy
if ierr is not None:
# scipy.optimize.leastsq
if ierr not in (1, 2, 3, 4):
indices.append(index)
else:
# scipy.optimize.least_squares
status = fit_info.get('status', None)
if status is not None and status in (-1, 0):
indices.append(index)
return np.array(indices, dtype=int)
def _make_fit_results(self, models, infos):
psf_nsub = self.psf_model.n_submodels
fit_models = []
fit_infos = []
fit_param_errs = []
nfitparam = len(self._fitted_psf_param_names)
for model, info in zip(models, infos):
model_nsub = model.n_submodels
npsf_models = model_nsub // psf_nsub
param_cov = info.get('param_cov', None)
if param_cov is None:
if nfitparam == 0: # model params are all fixed
nfitparam = 3 # x_err, y_err, and flux_err are np.nan
param_err = np.array([np.nan] * nfitparam * npsf_models)
else:
param_err = np.sqrt(np.diag(param_cov))
# model is for a single source (which may be compound)
if npsf_models == 1:
fit_models.append(model)
fit_infos.append(info)
fit_param_errs.append(param_err)
continue
# model is a grouped model for multiple sources
fit_models.extend(self._split_compound_model(model, psf_nsub))
fit_infos.extend([info] * npsf_models) # views
fit_param_errs.extend(self._split_param_errs(param_err, nfitparam))
if len(fit_models) != len(fit_infos):
raise ValueError('fit_models and fit_infos have different lengths')
# change the sorting from group_id to source id order
fit_models = self._order_by_id(fit_models)
fit_infos = self._order_by_id(fit_infos)
fit_param_errs = np.array(self._order_by_id(fit_param_errs))
self._fit_models = fit_models
self.fit_results['fit_infos'] = fit_infos
self.fit_results['fit_param_errs'] = fit_param_errs
self.fit_results['fit_error_indices'] = self._get_fit_error_indices()
return fit_models
def _fit_sources(self, data, init_params, *, error=None, mask=None):
if self.fitter_maxiters is not None:
kwargs = {'maxiter': self.fitter_maxiters}
else:
kwargs = {}
sources = init_params.group_by('group_id')
ungroup_idx = np.argsort(sources['id'].value)
self._group_results['ungroup_indices'] = ungroup_idx
sources = sources.groups
if self.progress_bar:
desc = 'Fit source/group'
sources = add_progress_bar(sources, desc=desc) # pragma: no cover
# Save the fit_info results for these keys if they are present.
# Some of these keys are returned only by some fitters. These
# keys contain the fit residuals (fvec or fun), the parameter
# covariance matrix (param_cov), and the fit status (ierr,
# message) or (status).
fit_info_keys = ('fvec', 'fun', 'param_cov', 'ierr', 'message',
'status')
fit_models = []
fit_infos = []
nmodels = []
for sources_ in sources: # fit in group_id order
nsources = len(sources_)
nmodels.append([nsources] * nsources)
psf_model = self._make_psf_model(sources_)
yi, xi, cutout = self._define_fit_data(sources_, data, mask)
if error is not None:
weights = 1.0 / error[yi, xi]
else:
weights = None
with warnings.catch_warnings():
warnings.simplefilter('ignore', AstropyUserWarning)
try:
fit_model = self.fitter(psf_model, xi, yi, cutout,
weights=weights, **kwargs)
try:
fit_model.clear_cache()
except AttributeError:
pass
except TypeError as exc:
msg = ('The number of data points is less than the '
'number of fit parameters. This is likely due '
'to overmasked data. Please check the input '
'mask.')
raise ValueError(msg) from exc
fit_info = {}
for key in fit_info_keys:
value = self.fitter.fit_info.get(key, None)
if value is not None:
fit_info[key] = value
fit_models.append(fit_model)
fit_infos.append(fit_info)
self._group_results['fit_models'] = fit_models
self._group_results['fit_infos'] = fit_infos
self._group_results['nmodels'] = nmodels
# split the groups and return objects in source-id order
fit_models = self._make_fit_results(fit_models, fit_infos)
return fit_models
def _model_params_to_table(self, models):
fit_param_map = self._param_maps['fit']
params = []
for model in models:
mparams = []
for model_param in fit_param_map.keys():
mparams.append(getattr(model, model_param).value)
params.append(mparams)
vals = np.transpose(params)
colnames = fit_param_map.values()
table = QTable()
for index, colname in enumerate(colnames):
table[colname] = vals[index]
return table
def _param_errors_to_table(self):
err_param_map = self._param_maps['err']
table = QTable()
for index, name in enumerate(self._fitted_psf_param_names):
colname = err_param_map[name]
table[colname] = self.fit_results['fit_param_errs'][:, index]
colnames = list(err_param_map.values())
# add missing error columns
nsources = len(self._fit_models)
for colname in colnames:
if colname not in table.colnames:
table[colname] = [np.nan] * nsources
# sort column names
return table[colnames]
def _calc_fit_metrics(self, data, source_tbl):
# Keep cen_idx as a list because it can have NaNs with the ints.
# If NaNs are present, turning it into an array will convert the
# ints to floats, which cannot be used as slices.
cen_idx = self._ungroup(self._group_results['psfcenter_indices'])
self.fit_results['psfcenter_indices'] = cen_idx
split_index = []
for npixfit in self._group_results['npixfit']:
split_index.append(np.cumsum(npixfit)[:-1])
# find the key with the fit residual (fitter dependent)
finfo_keys = self._group_results['fit_infos'][0].keys()
keys = ('fvec', 'fun')
key = None
for key_ in keys:
if key_ in finfo_keys:
key = key_
# SimplexLSQFitter
if key is None:
qfit = cfit = np.array([[np.nan]] * len(source_tbl))
return qfit, cfit
fit_residuals = []
for idx, fit_info in zip(split_index,
self._group_results['fit_infos']):
fit_residuals.extend(np.split(fit_info[key], idx))
fit_residuals = self._order_by_id(fit_residuals)
self.fit_results['fit_residuals'] = fit_residuals
for npixfit, residuals in zip(self.fit_results['npixfit'],
fit_residuals):
if len(residuals) != npixfit:
raise ValueError('size of residuals does not match npixfit')
if len(fit_residuals) != len(source_tbl):
raise ValueError('fit_residuals does not match the source '
'table length')
with warnings.catch_warnings():
# ignore divide-by-zero if flux = 0
warnings.simplefilter('ignore', RuntimeWarning)
qfit = []
cfit = []
for index, (model, residual, cen_idx_) in enumerate(
zip(self._fit_models, fit_residuals, cen_idx)):
source = source_tbl[index]
xcen = py2intround(source[self._init_colnames['x']])
ycen = py2intround(source[self._init_colnames['y']])
flux_fit = source['flux_fit']
qfit.append(np.sum(np.abs(residual)) / flux_fit)
if np.isnan(cen_idx_):
# calculate residual at central pixel if the central pixel
# is within the bounds of the ``data``, otherwise mask it:
cen_in_data = (
0 <= ycen <= data.shape[0] - 1
and 0 <= xcen <= data.shape[1] - 1
)
if cen_in_data:
cen_residual = data[ycen, xcen] - model(xcen, ycen)
else:
cen_residual = np.nan
else:
# find residual at (xcen, ycen)
cen_residual = -residual[cen_idx_]
cfit.append(cen_residual / flux_fit)
return qfit, cfit
def _define_flags(self, source_tbl, shape):
flags = np.zeros(len(source_tbl), dtype=int)
for index, row in enumerate(source_tbl):
if row['npixfit'] < np.prod(self.fit_shape):
flags[index] += 1
if (row['x_fit'] < 0 or row['y_fit'] < 0
or row['x_fit'] > shape[1] or row['y_fit'] > shape[0]):
flags[index] += 2
if row['flux_fit'] <= 0:
flags[index] += 4
flags[self.fit_results['fit_error_indices']] += 8
try:
for index, fit_info in enumerate(self.fit_results['fit_infos']):
if fit_info['param_cov'] is None:
flags[index] += 16
except KeyError:
pass
return flags
def _prepare_fit_inputs(self, data, *, mask=None, error=None,
init_params=None):
"""
Prepare inputs for PSF fitting.
Tasks:
* Checks array input shapes and units.
* Calculates a total mask
* Validates inputs for init_params and aperture_radius
* Prepares initial parameters table
- Runs source finder if needed
- Runs aperture photometry if needed
- Runs local background estimation if needed
- Groups sources if needed
"""
(data, error), unit = process_quantities((data, error),
('data', 'error'))
data = self._validate_array(data, 'data')
mask = self._validate_array(mask, 'mask', data_shape=data.shape)
mask = self._make_mask(data, mask)
init_params = self._validate_init_params(init_params, unit) # copies
if (self.aperture_radius is None
and (init_params is None
or self._init_colnames['flux'] not in init_params.colnames)):
raise ValueError('aperture_radius must be defined if init_params '
'is not input or if a flux column is not in '
'init_params')
init_params = self._prepare_init_params(data, unit, mask, init_params)
self.fit_results['init_params'] = init_params
if init_params is None: # no sources detected
# TODO: raise warning
return None
_, counts = np.unique(init_params['group_id'], return_counts=True)
if max(counts) > 25:
warnings.warn('Some groups have more than 25 sources. Fitting '
'such groups may take a long time and be '
'error-prone. You may want to consider using '
'different `SourceGrouper` parameters or '
'changing the "group_id" column in "init_params".',
AstropyUserWarning)
return data, mask, error, init_params, unit
[docs]
def __call__(self, data, *, mask=None, error=None, init_params=None):
"""
Perform PSF photometry.
Parameters
----------
data : 2D `~numpy.ndarray`
The 2D array on which to perform photometry. Invalid data
values (i.e., NaN or inf) are automatically masked.
mask : 2D bool `~numpy.ndarray`, optional
A boolean mask with the same shape as ``data``, where a
`True` value indicates the corresponding element of ``data``
is masked.
error : 2D `~numpy.ndarray`, optional
The pixel-wise 1-sigma errors of the input ``data``.
``error`` is assumed to include *all* sources of
error, including the Poisson error of the sources
(see `~photutils.utils.calc_total_error`) . ``error``
must have the same shape as the input ``data``. If a
`~astropy.units.Quantity` array, then ``data`` must also be
a `~astropy.units.Quantity` array with the same units.
init_params : `~astropy.table.Table` or `None`, optional
A table containing the initial guesses of the (x, y, flux)
model parameters for each source. If the x and y values are
not input, then the ``finder`` keyword must be defined. If
the flux values are not input, then the ``aperture_radius``
keyword must be defined. Note that the initial flux
values refer to the model flux parameters and are not
corrected for local background values (computed using
``localbkg_estimator`` or input in a ``local_bkg`` column)
The allowed column names are:
* ``x_init``, ``xinit``, ``xcentroid``, ``x_centroid``,
``x_peak``, ``x``, ``xcen``, ``x_cen``, ``xpos``,
``x_pos``, ``x_0``, and ``x0``.
* ``y_init``, ``yinit``, ``ycentroid``, ``y_centroid``,
``y_peak``, ``y``, ``ycen``, ``y_cen``, ``ypos``,
``y_pos``, ``y_0``, and ``y0``.
* ``flux_init``, ``flux``, ``source_sum``,
``segment_flux``, and ``kron_flux``.
The parameter names are searched in the input table in the
above order, stopping at the first match.
The table can also have ``group_id`` and ``local_bkg``
columns. If ``group_id`` is input, the values will be used
and ``grouper`` keyword will be ignored. If ``local_bkg`` is
input, they will be used and the ``localbkg_estimator`` will
be ignored.
Returns
-------
table : `~astropy.table.QTable`
An astropy table with the PSF-fitting results. The table
will contain the following columns:
* ``id`` : unique identification number for the source
* ``group_id`` : unique identification number for the
source group
* ``x_init``, ``x_fit``, ``x_err`` : the initial, fit, and
error of the source x center
* ``y_init``, ``y_fit``, ``y_err`` : the initial, fit, and
error of the source y center
* ``flux_init``, ``flux_fit``, ``flux_err`` : the initial,
fit, and error of the source flux
* ``npixfit`` : the number of unmasked pixels used to fit
the source
* ``group_size`` : the total number of sources that were
simultaneously fit along with the given source
* ``qfit`` : a quality-of-fit metric defined as the
absolute value of the sum of the fit residuals divided by
the fit flux
* ``cfit`` : a quality-of-fit metric defined as the
fit residual in the central pixel divided by the fit flux
* ``flags`` : bitwise flag values:
* 1 : one or more pixels in the ``fit_shape`` region
were masked
* 2 : the fit x and/or y position lies outside of the
input data
* 4 : the fit flux is less than or equal to zero
* 8 : the fitter may not have converged
* 16 : the fitter parameter covariance matrix was not
returned
"""
if isinstance(data, NDData):
data_ = data.data
if data.unit is not None:
data_ <<= data.unit
mask = data.mask
unc = data.uncertainty
if unc is not None:
error = unc.represent_as(StdDevUncertainty).quantity
if error.unit is u.dimensionless_unscaled:
error = error.value
else:
error = error.to(data.unit)
return self.__call__(data_, mask=mask, error=error,
init_params=init_params)
# reset results from previous runs
self._reset_results()
# Prepare fit inputs, including defining the initial source
# parameters. This also runs the source finder, aperture
# photometry, local background estimator, and source grouper, if
# needed.
fit_inputs = self._prepare_fit_inputs(data, mask=mask, error=error,
init_params=init_params)
if fit_inputs is None:
return None
# fit the sources
data, mask, error, init_params, unit = fit_inputs
fit_models = self._fit_sources(data, init_params, error=error,
mask=mask)
# create output table
fit_sources = self._model_params_to_table(fit_models) # ungrouped
if len(init_params) != len(fit_sources):
raise ValueError('init_params and fit_sources tables have '
'different lengths')
source_tbl = hstack((init_params, fit_sources))
param_errors = self._param_errors_to_table()
if len(param_errors) > 0:
if len(param_errors) != len(source_tbl):
raise ValueError('param errors and fit sources tables have '
'different lengths')
source_tbl = hstack((source_tbl, param_errors))
npixfit = np.array(self._ungroup(self._group_results['npixfit']))
self.fit_results['npixfit'] = npixfit
source_tbl['npixfit'] = npixfit
nmodels = np.array(self._ungroup(self._group_results['nmodels']))
self.fit_results['nmodels'] = nmodels
source_tbl['group_size'] = nmodels
qfit, cfit = self._calc_fit_metrics(data, source_tbl)
source_tbl['qfit'] = qfit
source_tbl['cfit'] = cfit
source_tbl['flags'] = self._define_flags(source_tbl, data.shape)
if unit is not None:
source_tbl['local_bkg'] <<= unit
source_tbl['flux_fit'] <<= unit
source_tbl['flux_err'] <<= unit
meta = _get_meta()
attrs = ('fit_shape', 'fitter_maxiters', 'aperture_radius',
'progress_bar')
for attr in attrs:
meta[attr] = getattr(self, attr)
source_tbl.meta = meta
if len(self.fit_results['fit_error_indices']) > 0:
warnings.warn('One or more fit(s) may not have converged. Please '
'check the "flags" column in the output table.',
AstropyUserWarning)
return source_tbl
[docs]
def make_model_image(self, shape, psf_shape, *, include_localbkg=False):
"""
Create a 2D image from the fit PSF models and optional local
background.
Parameters
----------
shape : 2 tuple of int
The shape of the output array.
psf_shape : 2 tuple of int
The shape of region around the center of the fit model to
render in the output image.
include_localbkg : bool, optional
Whether to include the local background in the rendered
output image. Note that the local background level is
included around each source over the region defined by
``psf_shape``. Thus, regions where the ``psf_shape`` of
sources overlap will have the local background added
multiple times.
Returns
-------
array : 2D `~numpy.ndarray`
The rendered image from the fit PSF models.
"""
fit_models = self._fit_models
data = np.zeros(shape)
xname, yname = self._psf_param_names[0:2]
if self.progress_bar:
desc = 'Model image'
fit_models = add_progress_bar(fit_models, desc=desc) # pragma: no cover
# fit_models must be a list of individual, not grouped, PSF
# models, i.e., there should be one PSF model (which may be
# compound) for each source
for fit_model, local_bkg in zip(fit_models,
self.fit_results['local_bkg']):
x0 = getattr(fit_model, xname).value
y0 = getattr(fit_model, yname).value
try:
slc_lg, _ = overlap_slices(shape, psf_shape, (y0, x0),
mode='trim')
except NoOverlapError:
continue
yy, xx = np.mgrid[slc_lg]
data[slc_lg] += fit_model(xx, yy)
if include_localbkg:
data[slc_lg] += local_bkg
return data
[docs]
def make_residual_image(self, data, psf_shape, *, include_localbkg=False):
"""
Create a 2D residual image from the fit PSF models and local
background.
Parameters
----------
data : 2D `~numpy.ndarray`
The 2D array on which photometry was performed. This should
be the same array input when calling the PSF-photometry
class.
psf_shape : 2 tuple of int
The shape of region around the center of the fit model to
subtract.
include_localbkg : bool, optional
Whether to include the local background in the subtracted
model. Note that the local background level is subtracted
around each source over the region defined by ``psf_shape``.
Thus, regions where the ``psf_shape`` of sources overlap
will have the local background subtracted multiple times.
Returns
-------
array : 2D `~numpy.ndarray`
The residual image of the ``data`` minus the ``local_bkg``
minus the fit PSF models.
"""
if isinstance(data, NDData):
residual = deepcopy(data)
residual.data[:] = self.make_residual_image(
data.data, psf_shape, include_localbkg=include_localbkg)
else:
unit = None
if isinstance(data, u.Quantity):
unit = data.unit
data = data.value
residual = self.make_model_image(data.shape, psf_shape,
include_localbkg=include_localbkg)
np.subtract(data, residual, out=residual)
if unit is not None:
residual <<= unit
return residual
[docs]
class IterativePSFPhotometry:
"""
Class to iteratively perform PSF photometry.
This is a convenience class that iteratively calls the
`PSFPhotometry` class to perform PSF photometry on an input image.
It can be useful for crowded fields where faint stars are very
close to bright stars and are not detected in the first pass of
PSF photometry. For complex cases, one may need to manually run
`PSFPhotometry` in an iterative manner and inspect the residual
image after each iteration.
Parameters
----------
psf_model : `astropy.modeling.Fittable2DModel`
The PSF model to fit to the data. The model needs to have
three parameters named ``x_0``, ``y_0``, and ``flux``,
corresponding to the center (x, y) position and flux.
fit_shape : int or length-2 array_like
The rectangular shape around the center of a star that will
be used to define the PSF-fitting data. If ``fit_shape`` is a
scalar then a square shape of size ``fit_shape`` will be used.
If ``fit_shape`` has two elements, they must be in ``(ny,
nx)`` order. Each element of ``fit_shape`` must be an odd
number. In general, ``fit_shape`` should be set to a small size
(e.g., ``(5, 5)``) that covers the region with the highest flux
signal-to-noise.
finder : callable or `~photutils.detection.StarFinderBase`
A callable used to identify stars in an image. The
``finder`` must accept a 2D image as input and return a
`~astropy.table.Table` containing the x and y centroid
positions. These positions are used as the starting points
for the PSF fitting. The allowed ``x`` column names are (same
suffix for ``y``): ``'x_init'``, ``'xinit'``, ``'xcentroid'``,
``'x_centroid'``, ``'x_peak'``, ``'x'``, ``'xcen'``,
``'x_cen'``, ``'xpos'``, ``'x_pos'``, ``'x_0'``, and ``'x0'``.
If `None`, then the initial (x, y) model positions must be input
using the ``init_params`` keyword when calling the class. The
(x, y) values in ``init_params`` override this keyword *only for
the first iteration*.
grouper : `~photutils.psf.SourceGrouper` or callable or `None`, optional
A callable used to group stars. Typically, grouped stars are
those that overlap with their neighbors. Stars that are grouped
are fit simultaneously. The ``grouper`` must accept the x and
y coordinates of the sources and return an integer array of
the group id numbers (starting from 1) indicating the group
in which a given source belongs. If `None`, then no grouping
is performed, i.e. each source is fit independently. The
``group_id`` values in ``init_params`` override this keyword
*only for the first iteration*. A warning is raised if any group
size is larger than 25 sources.
fitter : `~astropy.modeling.fitting.Fitter`, optional
The fitter object used to perform the fit of the model to the
data.
fitter_maxiters : int, optional
The maximum number of iterations in which the ``fitter`` is
called for each source.
maxiters : int, optional
The maximum number of PSF-fitting/subtraction iterations to
perform.
mode : {'new', 'all'}, optional
For the 'new' mode, `PSFPhotometry` is run in each iteration
only on the new sources detected in the residual image. For the
'all' mode, `PSFPhotometry` is run in each iteration on all the
detected sources (from all previous iterations) on the original,
unsubtracted, data. For the 'all' mode, a source ``grouper``
must be input. See the Notes section for more details.
localbkg_estimator : `~photutils.background.LocalBackground` or `None`, optional
The object used to estimate the local background around each
source. If `None`, then no local background is subtracted. The
``local_bkg`` values in ``init_params`` override this keyword.
aperture_radius : float, optional
The radius of the circular aperture used to estimate the initial
flux of each source. The ``flux_init`` values in ``init_params``
override this keyword *only for the first iteration*.
sub_shape : `None`, int, or length-2 array_like
The rectangular shape around the center of a star that will be
used when subtracting the fitted PSF models. If ``sub_shape`` is
a scalar then a square shape of size ``sub_shape`` will be used.
If ``sub_shape`` has two elements, they must be in ``(ny, nx)``
order. Each element of ``sub_shape`` must be an odd number. If
`None`, then ``sub_shape`` is set to ``fit_shape``.
progress_bar : bool, optional
Whether to display a progress bar when fitting the sources
(or groups). The progress bar requires that the `tqdm
<https://tqdm.github.io/>`_ optional dependency be installed.
Note that the progress bar does not currently work in the
Jupyter console due to limitations in ``tqdm``.
Notes
-----
This class has two modes of operation: 'new' and 'all'. For both
modes, `PSFPhotometry` is first run on the data, a residual image
is created, and the source finder is run on the residual image to
detect any new sources.
In the 'new' mode, `PSFPhotometry` is then run on the residual image
to fit the PSF model to the new sources. The process is repeated
until no new sources are detected or a maximum number of iterations
is reached.
In the 'all' mode, a new source list combining the sources from
first `PSFPhotometry` run and the new sources detected in the
residual image is created. `PSFPhotometry` is then run on the
original, unsubtracted, data with this combined source list. This
allows the source ``grouper`` (which is required for the 'all'
mode) to combine close sources to be fit simultaneously, improving
the fit. Again, the process is repeated until no new sources are
detected or a maximum number of iterations is reached.
"""
def __init__(self, psf_model, fit_shape, finder, *, grouper=None,
fitter=LevMarLSQFitter(), fitter_maxiters=100, maxiters=3,
mode='new', localbkg_estimator=None, aperture_radius=None,
sub_shape=None, progress_bar=False):
if finder is None:
raise ValueError('finder cannot be None for '
'IterativePSFPhotometry.')
if aperture_radius is None:
raise ValueError('aperture_radius cannot be None for '
'IterativePSFPhotometry.')
self.psfphot = PSFPhotometry(psf_model, fit_shape, finder=finder,
grouper=grouper, fitter=fitter,
fitter_maxiters=fitter_maxiters,
localbkg_estimator=localbkg_estimator,
aperture_radius=aperture_radius,
progress_bar=progress_bar)
self.maxiters = self._validate_maxiters(maxiters)
if mode not in ['new', 'all']:
raise ValueError('mode must be "new" or "all".')
if mode == 'all' and grouper is None:
raise ValueError('grouper must be input for the "all" mode.')
self.mode = mode
if sub_shape is None:
sub_shape = fit_shape
self.sub_shape = as_pair('sub_shape', sub_shape, lower_bound=(0, 1),
check_odd=True)
self.fit_results = []
@staticmethod
def _validate_maxiters(maxiters):
if (not np.isscalar(maxiters) or maxiters <= 0
or ~np.isfinite(maxiters)):
raise ValueError('maxiters must be a strictly-positive scalar')
if maxiters != int(maxiters):
raise ValueError('maxiters must be an integer')
return maxiters
[docs]
def __call__(self, data, *, mask=None, error=None, init_params=None):
"""
Perform PSF photometry.
Parameters
----------
data : 2D `~numpy.ndarray`
The 2D array on which to perform photometry. Invalid data
values (i.e., NaN or inf) are automatically masked.
mask : 2D bool `~numpy.ndarray`, optional
A boolean mask with the same shape as ``data``, where a
`True` value indicates the corresponding element of ``data``
is masked.
error : 2D `~numpy.ndarray`, optional
The pixel-wise 1-sigma errors of the input ``data``.
``error`` is assumed to include *all* sources of
error, including the Poisson error of the sources
(see `~photutils.utils.calc_total_error`) . ``error``
must have the same shape as the input ``data``. If a
`~astropy.units.Quantity` array, then ``data`` must also be
a `~astropy.units.Quantity` array with the same units.
init_params : `~astropy.table.Table` or `None`, optional
A table containing the initial guesses of the (x, y, flux)
model parameters for each source *only for the first
iteration*. If the x and y values are not input, then the
``finder`` will be used for all iterations. If the flux
values are not input, then the initial fluxes will be
measured in using the ``aperture_radius`` keyword. The input
flux values will be used for the first iteration only. The
allowed column names are:
* ``x_init``, ``xinit``, ``xcentroid``, ``x_centroid``,
``x_peak``, ``x``, ``xcen``, ``x_cen``, ``xpos``,
``x_pos``, ``x_0``, and ``x0``.
* ``y_init``, ``yinit``, ``ycentroid``, ``y_centroid``,
``y_peak``, ``y``, ``ycen``, ``y_cen``, ``ypos``,
``y_pos``, and ``y_0``, and ``y0``.
* ``flux_init``, ``flux``, ``source_sum``,
``segment_flux``, and ``kron_flux``.
The parameter names are searched in the input table in the
above order, stopping at the first match.
Returns
-------
table : `~astropy.table.QTable`
An astropy table with the PSF-fitting results. The table
will contain the following columns:
* ``id`` : unique identification number for the source
* ``group_id`` : unique identification number for the
source group
* ``iter_detected`` : the iteration number in which the
source was detected
* ``x_init``, ``x_fit``, ``x_err`` : the initial, fit, and
error of the source x center
* ``y_init``, ``y_fit``, ``y_err`` : the initial, fit, and
error of the source y center
* ``flux_init``, ``flux_fit``, ``flux_err`` : the initial,
fit, and error of the source flux
* ``npixfit`` : the number of unmasked pixels used to fit
the source
* ``group_size`` : the total number of sources that were
simultaneously fit along with the given source
* ``qfit`` : a quality-of-fit metric defined as the
absolute value of the sum of the fit residuals divided by
the fit flux
* ``cfit`` : a quality-of-fit metric defined as the
fit residual in the central pixel divided by the fit flux
* ``flags`` : bitwise flag values:
* 1 : one or more pixels in the ``fit_shape`` region
were masked
* 2 : the fit x and/or y position lies outside of the
input data
* 4 : the fit flux is less than or equal to zero
* 8 : the fitter may not have converged
* 16 : the fitter parameter covariance matrix was not
returned
"""
phot_tbl = self.psfphot(data, mask=mask, error=error,
init_params=init_params)
self.fit_results.append(deepcopy(self.psfphot))
if phot_tbl is None:
return None
phot_tbl['iter_detected'] = 1
iter_detected = np.ones(len(phot_tbl), dtype=int)
iter_num = 2
while iter_num <= self.maxiters and phot_tbl is not None:
if iter_num == 2:
residual_data = self.psfphot.make_residual_image(
data, self.sub_shape)
else:
residual_data = self.psfphot.make_residual_image(
residual_data, self.sub_shape)
# do not warn if no sources are found beyond the first iteration
with warnings.catch_warnings():
warnings.simplefilter('ignore', NoDetectionsWarning)
new_sources = self.psfphot.finder(residual_data, mask=mask)
if new_sources is None: # no new sources detected
break
xcol = self.psfphot._init_colnames['x']
ycol = self.psfphot._init_colnames['y']
new_sources = new_sources['xcentroid', 'ycentroid']
new_sources.rename_column('xcentroid', xcol)
new_sources.rename_column('ycentroid', ycol)
iter_det = np.ones(len(new_sources), dtype=int) * iter_num
iter_detected = np.concatenate((iter_detected, iter_det))
if self.mode == 'all':
# measure initial fluxes for the new sources from the
# residual data
flux = self.psfphot._get_aper_fluxes(residual_data, mask,
new_sources)
unit = getattr(data, 'unit', None)
if unit is not None:
flux <<= unit
fluxcol = self.psfphot._init_colnames['flux']
new_sources[fluxcol] = flux
# combine source tables and re-fit on the original data
orig_sources = phot_tbl['x_fit', 'y_fit', 'flux_fit']
orig_sources.rename_column('x_fit', xcol)
orig_sources.rename_column('y_fit', ycol)
orig_sources.rename_column('flux_fit', fluxcol)
init_params = vstack([orig_sources, new_sources])
residual_data = data
elif self.mode == 'new':
# fit new sources on the residual data
init_params = new_sources
new_tbl = self.psfphot(residual_data, mask=mask, error=error,
init_params=init_params)
self.psfphot.finder_results = new_sources
self.fit_results.append(deepcopy(self.psfphot))
if self.mode == 'all':
new_tbl['iter_detected'] = iter_detected
phot_tbl = new_tbl
elif self.mode == 'new':
# combine tables
new_tbl['iter_detected'] = iter_num
new_tbl['id'] += np.max(phot_tbl['id'])
new_tbl['group_id'] += np.max(phot_tbl['group_id'])
new_tbl.meta = {} # prevent merge conflicts
phot_tbl = vstack([phot_tbl, new_tbl])
iter_num += 1
# re-order 'iter_detected' column
if 'iter_detected' in phot_tbl.colnames:
colnames = phot_tbl.colnames.copy()
colnames.insert(2, 'iter_detected')
colnames = colnames[:-1]
phot_tbl = phot_tbl[colnames]
return phot_tbl
[docs]
def make_model_image(self, shape, psf_shape):
"""
Create a 2D image from the fit PSF models and local background.
Parameters
----------
shape : 2 tuple of int
The shape of the output array.
psf_shape : 2 tuple of int
The shape of region around the center of the fit model to
render in the output image.
Returns
-------
array : 2D `~numpy.ndarray`
The rendered image from the fit PSF models.
"""
if self.mode == 'new':
# collect the fit models and local backgrounds from each
# iteration
fit_models = []
local_bkgs = []
for psfphot in self.fit_results:
fit_models.append(psfphot._fit_models)
local_bkgs.append(psfphot.fit_results['local_bkg'])
fit_models = _flatten(fit_models)
local_bkgs = _flatten(local_bkgs)
else:
# use only the fit models and local backgrounds from the
# final iteration, which includes all sources
fit_models = self.fit_results[-1]._fit_models
local_bkgs = self.fit_results[-1].fit_results['local_bkg']
data = np.zeros(shape)
xname, yname = self.fit_results[0]._psf_param_names[0:2]
if self.psfphot.progress_bar:
desc = 'Model image'
fit_models = add_progress_bar(fit_models, desc=desc) # pragma: no cover
# fit_models must be a list of individual, not grouped, PSF
# models, i.e., there should be one PSF model (which may be
# compound) for each source
for fit_model, local_bkg in zip(fit_models, local_bkgs):
x0 = getattr(fit_model, xname).value
y0 = getattr(fit_model, yname).value
try:
slc_lg, _ = overlap_slices(shape, psf_shape, (y0, x0),
mode='trim')
except NoOverlapError:
continue
yy, xx = np.mgrid[slc_lg]
data[slc_lg] += (fit_model(xx, yy) + local_bkg)
return data
[docs]
def make_residual_image(self, data, psf_shape):
"""
Create a 2D residual image from the fit PSF models and local
background.
Parameters
----------
data : 2D `~numpy.ndarray`
The 2D array on which photometry was performed. This should
be the same array input when calling the PSF-photometry
class.
psf_shape : 2 tuple of int
The shape of region around the center of the fit model to
subtract.
Returns
-------
array : 2D `~numpy.ndarray`
The residual image of the ``data`` minus the ``local_bkg``
minus the fit PSF models.
"""
if isinstance(data, NDData):
residual = deepcopy(data)
residual.data[:] = self.make_residual_image(data.data, psf_shape)
else:
unit = None
if isinstance(data, u.Quantity):
unit = data.unit
data = data.value
residual = -self.make_model_image(data.shape, psf_shape)
residual += data
if unit is not None:
residual <<= unit
return residual
def _flatten(iterable):
"""
Flatten a list of lists.
"""
return list(chain.from_iterable(iterable))