# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Tools for finding local peaks in an astronomical image.
"""
import warnings
import numpy as np
from astropy.table import QTable
from scipy.ndimage import maximum_filter
from photutils.utils._deprecation import deprecated_renamed_argument
from photutils.utils._misc import _get_meta
from photutils.utils._parameters import as_pair
from photutils.utils._quantity_helpers import process_quantities
from photutils.utils._stats import nanmin
from photutils.utils.exceptions import NoDetectionsWarning
__all__ = ['find_peaks']
def _verify_ring_candidates(data, peak_mask, needs_verify, footprint_bool,
half, footprint_size):
"""
Verify ring candidates against the exact circular footprint.
Ring candidates are pixels that are the local maximum within the
inscribed box but not in the circumscribed box. These need per-pixel
verification against the actual circular footprint.
Parameters
----------
data : 2D `~numpy.ndarray`
The 2D image array.
peak_mask : 2D bool `~numpy.ndarray`
Boolean mask to update in place. `True` indicates a confirmed
local maximum.
needs_verify : 2D bool `~numpy.ndarray`
Boolean mask of candidate pixels that require verification.
footprint_bool : 2D bool `~numpy.ndarray`
The circular footprint boolean mask.
half : int
Half the footprint size (``footprint_size // 2``), used to
center the footprint on each candidate pixel.
footprint_size : int
The size of the circular footprint array along each axis.
"""
y_maybe, x_maybe = needs_verify.nonzero()
if len(y_maybe) == 0:
return
ny, nx = data.shape
for y, x in zip(y_maybe, x_maybe, strict=True):
# Map footprint onto data, clipping to image boundaries
y0 = y - half
y1 = y0 + footprint_size
x0 = x - half
x1 = x0 + footprint_size
dy0, dy1 = max(0, y0), min(ny, y1)
dx0, dx1 = max(0, x0), min(nx, x1)
fy0 = dy0 - y0
fy1 = footprint_size - (y1 - dy1)
fx0 = dx0 - x0
fx1 = footprint_size - (x1 - dx1)
local = data[dy0:dy1, dx0:dx1]
fp_local = footprint_bool[fy0:fy1, fx0:fx1]
local_max = local[fp_local].max()
# Footprint extends beyond image: include cval=0.0
if (fy0 > 0 or fy1 < footprint_size or fx0 > 0
or fx1 < footprint_size):
local_max = max(local_max, 0.0)
# peak_mask is updated in place
if data[y, x] == local_max:
peak_mask[y, x] = True
def _fast_circular_peaks(data, radius):
"""
Find pixels that are local maxima within circular regions.
This is equivalent to::
idx = np.arange(-radius, radius + 1)
xx, yy = np.meshgrid(idx, idx)
footprint = np.array((xx**2 + yy**2) <= radius**2, dtype=int)
data_max = maximum_filter(data, footprint=footprint,
mode='constant', cval=0.0)
peaks = (data == data_max)
but uses fast separable box filters with targeted circular
verification, which is typically ~10-400x faster (depending on the
radius).
Parameters
----------
data : 2D `~numpy.ndarray`
The 2D image array. Must be NaN-free because
`~scipy.ndimage.maximum_filter` propagates NaNs, which would
corrupt the local-maximum comparisons.
radius : float
The radius of the circular region in pixels.
Returns
-------
peak_mask : 2D bool `~numpy.ndarray`
Boolean mask where `True` indicates a local maximum within the
circular region.
"""
# Build the circular footprint
idx = np.arange(-radius, radius + 1)
radius_sq = radius ** 2
footprint_size = len(idx)
xx, yy = np.meshgrid(idx, idx)
footprint_bool = (xx ** 2 + yy ** 2) <= radius_sq
# For even-sized footprints (non-integer radius), scipy's
# maximum_filter places the center at index ``footprint_size // 2``
# (i.e., the origin is biased by +0.5 pixel). The same convention is
# used here so that the fast path is bit-identical to the reference
# maximum_filter(footprint=...) result.
half = footprint_size // 2
# Circumscribed box (size = footprint_size): contains the footprint.
# Any pixel that is the max in this box is definitely the max in the
# circular footprint, since circle <= box.
data_max_box = maximum_filter(data, size=footprint_size, mode='constant',
cval=0.0)
definite = (data == data_max_box)
# Inscribed box: fits inside the circle. For even-sized footprints,
# the circle center is shifted by 0.5 from the pixel center. We
# account for this so the inscribed box stays inside the circle.
if footprint_size % 2 == 0:
half_side = int(np.floor(radius / np.sqrt(2) - 0.5))
else:
half_side = int(np.floor(radius / np.sqrt(2)))
side_insc = max(2 * half_side + 1, 3)
data_max_insc = maximum_filter(data, size=side_insc, mode='constant',
cval=0.0)
# Candidates from inscribed box are a superset of true peaks
candidates = (data == data_max_insc)
# Ring candidates: max in inscribed box but not in circumscribed
# box. These need per-pixel verification against the actual circular
# footprint.
needs_verify = candidates & ~definite
peak_mask = definite.copy()
# peak_mask is updated in place
_verify_ring_candidates(data, peak_mask, needs_verify, footprint_bool,
half, footprint_size)
return peak_mask
[docs]
@deprecated_renamed_argument('npeaks', 'n_peaks', '3.0', until='4.0')
def find_peaks(data, threshold, *, box_size=3, footprint=None, mask=None,
border_width=None, n_peaks=np.inf, min_separation=None,
centroid_func=None, error=None, wcs=None):
"""
Find local peaks in an image that are above a specified threshold
value.
Peaks are the maxima above the ``threshold`` within a local
region. The local regions are defined by either the ``box_size``
or ``footprint`` parameters. ``box_size`` defines the local region
around each pixel as a square box. ``footprint`` is a boolean array
where `True` values specify the region shape.
If multiple pixels within a local region have identical intensities,
then the coordinates of all such pixels are returned. Otherwise,
there will be only one peak pixel per local region. Thus, the
defined region effectively imposes a minimum separation between
peaks unless there are identical peaks within the region.
When ``min_separation`` is set, a fast algorithm is used that
produces results equivalent to using a circular ``footprint`` of the
given radius for `~scipy.ndimage.maximum_filter`, but is typically
~10-400x faster (depending on the radius). When set, ``box_size``
and ``footprint`` are not used for peak detection.
If ``centroid_func`` is input, then it will be used to calculate a
centroid within the defined local region centered on each detected
peak pixel. In this case, the centroid will also be returned in the
output table.
Parameters
----------
data : array_like
The 2D array of the image.
threshold : float, scalar `~astropy.units.Quantity` or array_like
The data value or pixel-wise data values to be used
for the detection threshold. A peak is detected only
if it is strictly greater than the ``threshold``. If
``data`` is a `~astropy.units.Quantity` array, then
``threshold`` must have the same units as ``data``. A 2D
``threshold`` must have the same shape as ``data``. See
`~photutils.segmentation.detect_threshold` for one way to create
a ``threshold`` image.
box_size : scalar or tuple, optional
The size of the local region to search for peaks at every point
in ``data``. If ``box_size`` is a scalar, then the region
shape will be ``(box_size, box_size)``. Either ``box_size`` or
``footprint`` must be defined. If they are both defined, then
``footprint`` overrides ``box_size``.
footprint : `~numpy.ndarray` of bools, optional
A boolean array where `True` values describe the local
footprint region within which to search for peaks at every
point in ``data``. ``box_size=(n, m)`` is equivalent to
``footprint=np.ones((n, m))``. Either ``box_size`` or
``footprint`` must be defined. If they are both defined, then
``footprint`` overrides ``box_size``.
mask : array_like, bool, optional
A boolean mask with the same shape as ``data``, where a `True`
value indicates the corresponding element of ``data`` is masked.
border_width : int, array_like of int, or None, optional
The width in pixels to exclude around the border of the
``data``. If ``border_width`` is a scalar then ``border_width``
will be applied to all sides. If ``border_width`` has two
elements, they must be in ``(ny, nx)`` order. If `None`, then no
border is excluded. The border width values must be non-negative
integers.
n_peaks : int, optional
The maximum number of peaks to return. When the number of
detected peaks exceeds ``n_peaks``, the peaks with the highest
peak intensities will be returned.
min_separation : float or None, optional
The minimum allowed separation (in pixels) between detected
peaks, enforced using a circular region of this radius. Each
detected peak must be the maximum value (or tied for the
maximum) within a circle of this radius. This is equivalent to
using a circular ``footprint`` of the given radius but uses a
fast algorithm that is typically ~10-400x faster (depending on
the radius). When set, ``box_size`` and ``footprint`` are not
used for peak detection. If `None` (default), the peak detection
uses ``box_size`` or ``footprint`` as specified.
centroid_func : callable, optional
A callable object (e.g., function or class) that is used to
calculate the centroid of a 2D array. The ``centroid_func``
must accept a 2D `~numpy.ndarray`, have a ``mask`` keyword, and
optionally an ``error`` keyword. The callable object must return
a tuple of two 1D `~numpy.ndarray` objects, representing the x
and y centroids, respectively.
error : array_like, optional
The 2D array of the 1-sigma errors of the input ``data``.
``error`` is used only if ``centroid_func`` is input (the
``error`` array is passed directly to the ``centroid_func``). If
``data`` is a `~astropy.units.Quantity` array, then ``error``
must have the same units as ``data``.
wcs : `None` or WCS object, optional
A world coordinate system (WCS) transformation that
supports the `astropy shared interface for WCS
<https://docs.astropy.org/en/stable/wcs/wcsapi.html>`_
(e.g., `astropy.wcs.WCS`, `gwcs.wcs.WCS`). If `None`, then
the sky coordinates will not be returned in the output
`~astropy.table.Table`.
Returns
-------
output : `~astropy.table.QTable` or `None`
A table containing the x and y pixel location of the peaks and
their values. If ``centroid_func`` is input, then the table will
also contain the centroid position. If no peaks are found then
`None` is returned.
Notes
-----
By default, the returned pixel coordinates are the integer indices
of the maximum pixel value within the input ``box_size`` or
``footprint`` (i.e., only the peak pixel is identified).
When ``min_separation`` is given, peaks are detected
using a fast algorithm that is mathematically equivalent
to a circular ``footprint`` of the given radius for
`~scipy.ndimage.maximum_filter`. The algorithm uses two fast O(N)
separable box filters (inscribed and circumscribed squares of
the circle) to classify most candidates, then verifies only the
remaining few against the exact circular region.
A centroiding function can be input via the ``centroid_func``
keyword to compute centroid coordinates with subpixel precision
within the input ``box_size`` or ``footprint``. Note that when
``min_separation`` is used, the centroid region size is determined
by ``box_size`` (default 3), not by ``min_separation``.
The peak detection uses ``mode='constant'`` with ``cval=0.0`` for
`~scipy.ndimage.maximum_filter`, which means pixels outside the
image boundary are treated as zero. For images with all-negative
values, this may suppress legitimate peaks near the borders.
Any NaN values in the input ``data`` are replaced with the minimum
finite value before peak detection, and the corresponding pixels are
automatically excluded from the results.
The output column names (``x_peak``, ``y_peak``, ``peak_value``)
differ from the star finder classes (e.g.,
`~photutils.detection.DAOStarFinder`), which use ``x_centroid``,
``y_centroid``, and ``flux``.
"""
arrays, unit = process_quantities((data, threshold, error),
('data', 'threshold', 'error'))
data, threshold, error = arrays
data = np.asanyarray(data)
if centroid_func is not None and not callable(centroid_func):
msg = 'centroid_func must be a callable object'
raise TypeError(msg)
if min_separation is not None and min_separation < 0:
msg = 'min_separation must be >= 0'
raise ValueError(msg)
if np.all(data == data.flat[0]):
msg = 'Input data is constant. No local peaks can be found.'
warnings.warn(msg, NoDetectionsWarning)
return None
if not np.isscalar(threshold):
threshold = np.asanyarray(threshold)
if data.shape != threshold.shape:
msg = ('threshold array must have the same shape as the '
'input data')
raise ValueError(msg)
if border_width is not None:
border_width = as_pair('border_width', border_width,
lower_bound=(0, 1), upper_bound=data.shape)
# Remove NaN values to avoid runtime warnings and exclude NaN pixels
# from peak detection
nan_mask = np.isnan(data)
if np.any(nan_mask):
data = np.copy(data) # ndarray
data[nan_mask] = nanmin(data)
mask = (nan_mask if mask is None
else np.asanyarray(mask) | nan_mask)
# peak_goodmask: good pixels are True
if min_separation is not None and min_separation > 0:
peak_goodmask = _fast_circular_peaks(data, min_separation)
elif footprint is not None:
data_max = maximum_filter(data, footprint=footprint, mode='constant',
cval=0.0)
peak_goodmask = (data == data_max)
else:
data_max = maximum_filter(data, size=box_size, mode='constant',
cval=0.0)
peak_goodmask = (data == data_max)
# Exclude peaks that are masked
if mask is not None:
mask = np.asanyarray(mask, dtype=bool)
if data.shape != mask.shape:
msg = 'data and mask must have the same shape'
raise ValueError(msg)
peak_goodmask = np.logical_and(peak_goodmask, ~mask)
# Exclude peaks that are too close to the border
if border_width is not None:
ny, nx = border_width
if ny > 0:
peak_goodmask[:ny, :] = False
peak_goodmask[-ny:, :] = False
if nx > 0:
peak_goodmask[:, :nx] = False
peak_goodmask[:, -nx:] = False
# Exclude peaks below the threshold
peak_goodmask = np.logical_and(peak_goodmask, (data > threshold))
y_peaks, x_peaks = peak_goodmask.nonzero()
peak_values = data[y_peaks, x_peaks]
if unit is not None:
peak_values <<= unit
n_x_peaks = len(x_peaks)
if n_x_peaks == 0:
msg = 'No local peaks were found.'
warnings.warn(msg, NoDetectionsWarning)
return None
if n_x_peaks > n_peaks:
idx = np.argsort(peak_values)[::-1][:n_peaks]
x_peaks = x_peaks[idx]
y_peaks = y_peaks[idx]
peak_values = peak_values[idx]
# Construct the output table
ids = np.arange(len(x_peaks)) + 1
colnames = ['id', 'x_peak', 'y_peak', 'peak_value']
coldata = [ids, x_peaks, y_peaks, peak_values]
table = QTable(coldata, names=colnames)
table.meta.update(_get_meta()) # keep table.meta type
if wcs is not None:
skycoord_peaks = wcs.pixel_to_world(x_peaks, y_peaks)
idx = table.colnames.index('y_peak') + 1
table.add_column(skycoord_peaks, name='skycoord_peak', index=idx)
# Perform centroiding
if centroid_func is not None:
# Prevent circular import
from photutils.centroids import centroid_sources
# When a footprint is provided, derive the centroid box_size
# from the footprint shape so they are consistent. Ensure odd
# dimensions for centroid_sources.
if footprint is not None:
centroid_box_size = tuple(
s if s % 2 else s + 1 for s in footprint.shape)
else:
centroid_box_size = box_size
x_centroids, y_centroids = centroid_sources(
data, x_peaks, y_peaks, box_size=centroid_box_size,
footprint=footprint, error=error, mask=mask,
centroid_func=centroid_func)
table['x_centroid'] = x_centroids
table['y_centroid'] = y_centroids
if wcs is not None:
skycoord_centroids = wcs.pixel_to_world(x_centroids, y_centroids)
idx = table.colnames.index('y_centroid') + 1
table.add_column(skycoord_centroids, name='skycoord_centroid',
index=idx)
return table