Source code for photutils.psf.model_io

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
This module defines tools for reading and writing PSF models.
"""

import io
import itertools
import os
import warnings

import numpy as np
from astropy.io import fits, registry
from astropy.io.fits.verify import VerifyWarning
from astropy.nddata import NDData, reshape_as_blocks

__all__ = ['GriddedPSFModelRead', 'stdpsf_reader', 'webbpsf_reader']
__doctest_skip__ = ['GriddedPSFModelRead']


[docs] class GriddedPSFModelRead(registry.UnifiedReadWrite): """ Read and parse a FITS file into a `GriddedPSFModel` instance. This class enables the astropy unified I/O layer for `~photutils.psf.GriddedPSFModel`. This allows easily reading a file in different supported data formats using syntax such as:: >>> from photutils.psf import GriddedPSFModel >>> psf_model = GriddedPSFModel.read('filename.fits', format=format) Get help on the available readers for `~photutils.psf.GriddedPSFModel` using the ``help()`` method:: >>> # Get help reading Table and list supported formats >>> GriddedPSFModel.read.help() >>> # Get detailed help on the STSPSF FITS reader >>> GriddedPSFModel.read.help('stdpsf') >>> # Get detailed help on the WebbPSF FITS reader >>> GriddedPSFModel.read.help('webbpsf') >>> # Print list of available formats >>> GriddedPSFModel.read.list_formats() Parameters ---------- instance : object Descriptor calling instance or `None` if no instance. cls : type Descriptor calling class (either owner class or instance class). """ def __init__(self, instance, cls): # uses default global registry super().__init__(instance, cls, 'read', registry=None)
[docs] def __call__(self, *args, **kwargs): """ Read and parse a FITS file into a `GriddedPSFModel` instance using the registered "read" function. Parameters ---------- *args : tuple Positional arguments passed through to data reader. The first argument is typically the input filename. **kwargs : dict, optional Keyword arguments passed through to data reader. This includes the ``format`` keyword argument. Returns ------- out : `~photutils.psf.GriddedPSFModel` A gridded ePSF model corresponding to FITS file contents. """ return self.registry.read(self._cls, *args, **kwargs)
def _read_stdpsf(filename): """ Read a STScI standard-format ePSF (STDPSF) FITS file. Parameters ---------- filename : str The name of the STDPDF FITS file. Returns ------- data : dict A dictionary containing the ePSF data and metadata. """ with warnings.catch_warnings(): warnings.simplefilter('ignore', VerifyWarning) with fits.open(filename, ignore_missing_end=True) as hdulist: header = hdulist[0].header data = hdulist[0].data try: npsfs = header['NAXIS3'] nxpsfs = header['NXPSFS'] nypsfs = header['NYPSFS'] except KeyError as exc: raise ValueError('Invalid STDPDF FITS file.') from exc if 'IPSFX01' in header: xgrid = [header[f'IPSFX{i:02d}'] for i in range(1, nxpsfs + 1)] ygrid = [header[f'JPSFY{i:02d}'] for i in range(1, nypsfs + 1)] elif 'IPSFXA5' in header: xgrid = [] ygrid = [] xkeys = ('IPSFXA5', 'IPSFXB5', 'IPSFXC5', 'IPSFXD5') for xkey in xkeys: xgrid.extend([int(n) for n in header[xkey].split()]) ykeys = ('JPSFYA5', 'JPSFYB5') for ykey in ykeys: ygrid.extend([int(n) for n in header[ykey].split()]) else: raise ValueError('Unknown STDPSF FITS file.') # STDPDF FITS positions are 1-indexed xgrid = np.array(xgrid) - 1 ygrid = np.array(ygrid) - 1 # nypsfs, nxpsfs, detector # 6, 6 WFPC2, 4 det # 1, 1 ACS/HRC # 10, 9 ACS/WFC, 2 det # 3, 3 WFC3/IR # 8, 7 WFC3/UVIS, 2 det # 5, 5 NIRISS # 5, 5 NIRCam SW # 10, 20 NIRCam SW (NRCSW), 8 det # 5, 5 NIRCam LW # 3, 3 MIRI return {'data': data, 'npsfs': npsfs, 'nxpsfs': nxpsfs, 'nypsfs': nypsfs, 'xgrid': xgrid, 'ygrid': ygrid} def _split_detectors(grid_data, detector_data, detector_id): """ Split an ePSF array into individual detectors. Parameters ---------- grid_data : dict A dictionary containing the ePSF data and metadata. detector_data : dict A dictionary containing the detector data. detector_id : int The detector ID. Returns ------- data : `~numpy.ndarray` The ePSF data for the specified detector. xgrid : `~numpy.ndarray` The x-grid for the specified detector. ygrid : `~numpy.ndarray` The y-grid for the specified detector. Notes ----- In particular:: * HST WFPC2 STDPSF file contains 4 detectors * HST ACS/WFC STDPSF file contains 2 detectors * HST WFC3/UVIS STDPSF file contains 2 detectors * JWST NIRCam "NRCSW" STDPSF file contains 8 detectors """ data = grid_data['data'] npsfs = grid_data['npsfs'] nxpsfs = grid_data['nxpsfs'] nypsfs = grid_data['nypsfs'] xgrid = grid_data['xgrid'] ygrid = grid_data['ygrid'] nxdet = detector_data['nxdet'] nydet = detector_data['nydet'] det_map = detector_data['det_map'] det_size = detector_data['det_size'] ii = np.arange(npsfs).reshape((nypsfs, nxpsfs)) nxpsfs //= nxdet nypsfs //= nydet ndet = nxdet * nydet ii = reshape_as_blocks(ii, (nypsfs, nxpsfs)) ii = ii.reshape(ndet, npsfs // ndet) # detector_id -> index det_idx = det_map[detector_id] idx = ii[det_idx] data = data[idx] xp = det_idx % nxdet i0 = xp * nxpsfs i1 = i0 + nxpsfs xgrid = xgrid[i0:i1] - xp * det_size ygrid = ygrid[:nypsfs] if det_idx < nxdet else ygrid[nypsfs:] - det_size return data, xgrid, ygrid def _split_wfc_uvis(grid_data, detector_id): """ Split an ePSF array into individual WFC/UVIS detectors. Parameters ---------- grid_data : dict A dictionary containing the ePSF data and metadata. detector_id : int The detector ID. Returns ------- data : `~numpy.ndarray` The ePSF data for the specified detector. xgrid : `~numpy.ndarray` The x-grid for the specified detector. ygrid : `~numpy.ndarray` The y-grid for the specified detector. """ if detector_id is None: raise ValueError('detector_id must be specified for ACS/WFC and ' 'WFC3/UVIS ePSFs.') if detector_id not in (1, 2): raise ValueError('detector_id must be 1 or 2.') # ACS/WFC1 and WFC3/UVIS1 chip1 (sci, 2) are above chip2 (sci, 1) # in y-pixel coordinates xgrid = grid_data['xgrid'] ygrid = grid_data['ygrid'] ygrid = ygrid.reshape((2, ygrid.shape[0] // 2))[detector_id - 1] if detector_id == 2: ygrid -= 2048 npsfs = grid_data['npsfs'] data = grid_data['data'] data_ny, data_nx = data.shape[1:] data = data.reshape((2, npsfs // 2, data_ny, data_nx))[detector_id - 1] return data, xgrid, ygrid def _split_wfpc2(grid_data, detector_id): """ Split an ePSF array into individual WFPC2 detectors. Parameters ---------- grid_data : dict A dictionary containing the ePSF data and metadata. detector_id : int The detector ID. Returns ------- data : `~numpy.ndarray` The ePSF data for the specified detector. xgrid : `~numpy.ndarray` The x-grid for the specified detector. ygrid : `~numpy.ndarray` The y-grid for the specified detector. """ if detector_id is None: raise ValueError('detector_id must be specified for WFPC2 ePSFs') if detector_id not in range(1, 5): raise ValueError('detector_id must be between 1 and 4, inclusive') nxdet = 2 nydet = 2 det_size = 800 # det (exten:idx) # WF2 (2:2) PC (1:3) # WF3 (3:0) WF4 (4:1) det_map = {1: 3, 2: 2, 3: 0, 4: 1} detector_data = {'nxdet': nxdet, 'nydet': nydet, 'det_size': det_size, 'det_map': det_map} return _split_detectors(grid_data, detector_data, detector_id) def _split_nrcsw(grid_data, detector_id): """ Split an ePSF array into individual NIRCam SW detectors. Parameters ---------- grid_data : dict A dictionary containing the ePSF data and metadata. detector_id : int The detector ID. Returns ------- data : `~numpy.ndarray` The ePSF data for the specified detector. xgrid : `~numpy.ndarray` The x-grid for the specified detector. ygrid : `~numpy.ndarray` The y-grid for the specified detector. """ if detector_id is None: raise ValueError('detector_id must be specified for NRCSW ePSFs') if detector_id not in range(1, 9): raise ValueError('detector_id must be between 1 and 8, inclusive') nxdet = 4 nydet = 2 det_size = 2048 # det (ext:idx) # A2 (2:4) A4 (4:5) B3 (7:6) B1 (5:7) # A1 (1:0) A3 (3:1) B4 (8:2) B2 (6:3) det_map = {1: 0, 3: 1, 8: 2, 6: 3, 2: 4, 4: 5, 7: 6, 5: 7} detector_data = {'nxdet': nxdet, 'nydet': nydet, 'det_size': det_size, 'det_map': det_map} return _split_detectors(grid_data, detector_data, detector_id) def _get_metadata(filename, detector_id): """ Get metadata from the filename and ``detector_id``. Parameters ---------- filename : str The name of the STDPDF FITS file. detector_id : int The detector ID. Returns ------- meta : dict or `None` A dictionary containing the metadata. """ if isinstance(filename, io.FileIO): filename = filename.name parts = os.path.basename(filename).strip('.fits').split('_') if len(parts) not in (3, 4): return None # filename from astropy download_file detector, filter_name = parts[1:3] meta = {'STDPSF': filename, 'detector': detector, 'filter': filter_name} if detector_id is not None: detector_map = {'WFPC2': ['HST/WFPC2', 'WFPC2'], 'ACSHRC': ['HST/ACS', 'HRC'], 'ACSWFC': ['HST/ACS', 'WFC'], 'WFC3UV': ['HST/WFC3', 'UVIS'], 'WFC3IR': ['HST/WFC3', 'IR'], 'NRCSW': ['JWST/NIRCam', 'NRCSW'], 'NRCA1': ['JWST/NIRCam', 'A1'], 'NRCA2': ['JWST/NIRCam', 'A2'], 'NRCA3': ['JWST/NIRCam', 'A3'], 'NRCA4': ['JWST/NIRCam', 'A4'], 'NRCB1': ['JWST/NIRCam', 'B1'], 'NRCB2': ['JWST/NIRCam', 'B2'], 'NRCB3': ['JWST/NIRCam', 'B3'], 'NRCB4': ['JWST/NIRCam', 'B4'], 'NRCAL': ['JWST/NIRCam', 'A5'], 'NRCBL': ['JWST/NIRCam', 'B5'], 'NIRISS': ['JWST/NIRISS', 'NIRISS'], 'MIRI': ['JWST/MIRI', 'MIRIM']} try: inst_det = detector_map[detector] except KeyError as exc: raise ValueError(f'Unknown detector {detector}.') from exc if inst_det[1] == 'WFPC2': wfpc2_map = {1: 'PC', 2: 'WF2', 3: 'WF3', 4: 'WF4'} inst_det[1] = wfpc2_map[detector_id] if inst_det[1] in ('WFC', 'UVIS'): chip = 2 if detector_id == 1 else 1 inst_det[1] = f'{inst_det[1]}{chip}' if inst_det[1] == 'NRCSW': sw_map = {1: 'A1', 2: 'A2', 3: 'A3', 4: 'A4', 5: 'B1', 6: 'B2', 7: 'B3', 8: 'B4'} inst_det[1] = sw_map[detector_id] meta['instrument'] = inst_det[0] meta['detector'] = inst_det[1] return meta
[docs] def stdpsf_reader(filename, detector_id=None): """ Generate a `~photutils.psf.GriddedPSFModel` from a STScI standard- format ePSF (STDPSF) FITS file. .. note:: Instead of being used directly, this function is intended to be used via the `~photutils.psf.GriddedPSFModel` ``read`` method, e.g., ``model = GriddedPSFModel.read(filename, format='stdpsf')``. STDPSF files are FITS files that contain a 3D array of ePSFs with the header detailing where the fiducial ePSFs are located in the detector coordinate frame. The oversampling factor for STDPSF FITS files is assumed to be 4. Parameters ---------- filename : str The name of the STDPDF FITS file. A URL can also be used. detector_id : `None` or int, optional For STDPSF files that contain ePSF grids for multiple detectors, one will need to identify the detector for which to extract the ePSF grid. This keyword is ignored for STDPSF files that do not contain ePSF grids for multiple detectors. For WFPC2, the detector value (int) should be: * 1: PC, 2: WF2, 3: WF3, 4: WF4 For ACS/WFC and WFC3/UVIS, the detector value should be: * 1: WFC2, UVIS2 (sci, 1) * 2: WFC1, UVIS1 (sci, 2) Note that for these two instruments, detector 1 is above detector 2 in the y direction. However, in the FLT FITS files, the (sci, 1) extension corresponds to detector 2 (WFC2, UVIS2) and the (sci, 2) extension corresponds to detector 1 (WFC1, UVIS1). For NIRCam NRCSW files that contain ePSF grids for all 8 SW detectors, the detector value should be: * 1: A1, 2: A2, 3: A3, 4: A4 * 5: B1, 6: B2, 7: B3, 8: B4 Returns ------- model : `~photutils.psf.GriddedPSFModel` The gridded ePSF model. """ from photutils.psf import GriddedPSFModel # prevent circular import grid_data = _read_stdpsf(filename) npsfs = grid_data['npsfs'] if npsfs in (90, 56, 36, 200): if npsfs in (90, 56): # ACS/WFC or WFC3/UVIS data (2 chips) data, xgrid, ygrid = _split_wfc_uvis(grid_data, detector_id) elif npsfs == 36: # WFPC2 data (4 chips) data, xgrid, ygrid = _split_wfpc2(grid_data, detector_id) elif npsfs == 200: # NIRCam SW data (8 chips) data, xgrid, ygrid = _split_nrcsw(grid_data, detector_id) else: raise ValueError('Unknown detector or STDPSF format') else: data = grid_data['data'] xgrid = grid_data['xgrid'] ygrid = grid_data['ygrid'] # itertools.product iterates over the last input first xy_grid = [yx[::-1] for yx in itertools.product(ygrid, xgrid)] oversampling = 4 # assumption for STDPSF files nxpsfs = xgrid.shape[0] nypsfs = ygrid.shape[0] meta = {'grid_xypos': xy_grid, 'oversampling': oversampling, 'nxpsfs': nxpsfs, 'nypsfs': nypsfs} # try to get additional metadata from the filename because this # information is not currently available in the FITS headers file_meta = _get_metadata(filename, detector_id) if file_meta is not None: meta.update(file_meta) return GriddedPSFModel(NDData(data, meta=meta))
[docs] def webbpsf_reader(filename): """ Generate a `~photutils.psf.GriddedPSFModel` from a WebbPSF FITS file containing a PSF grid. .. note:: Instead of being used directly, this function is intended to be used via the `~photutils.psf.GriddedPSFModel` ``read`` method, e.g., ``model = GriddedPSFModel.read(filename, format='webbpsf')``. The WebbPSF FITS file contain a 3D array of ePSFs with the header detailing where the fiducial ePSFs are located in the detector coordinate frame. Parameters ---------- filename : str The name of the WebbPSF FITS file. A URL can also be used. Returns ------- model : `~photutils.psf.GriddedPSFModel` The gridded ePSF model. """ from photutils.psf import GriddedPSFModel # prevent circular import with warnings.catch_warnings(): warnings.simplefilter('ignore', VerifyWarning) with fits.open(filename, ignore_missing_end=True) as hdulist: header = hdulist[0].header data = hdulist[0].data # handle the case of only one 2D PSF data = np.atleast_3d(data) if not any('DET_YX' in key for key in header): raise ValueError('Invalid WebbPSF FITS file; missing "DET_YX{}" ' 'header keys.') if 'OVERSAMP' not in header: raise ValueError('Invalid WebbPSF FITS file; missing "OVERSAMP" ' 'header key.') # convert header to meta dict header = header.copy(strip=True) header.pop('HISTORY', None) header.pop('COMMENT', None) header.pop('', None) meta = dict(header) meta = {key.lower(): meta[key] for key in meta} # user lower-case keys # define grid_xypos from DET_YX{} FITS header keywords xypos = [] for key in meta: if 'det_yx' in key: vals = header[key].lstrip('(').rstrip(')').split(',') xypos.append((float(vals[0]), float(vals[1]))) meta['grid_xypos'] = xypos if 'oversampling' not in meta: meta['oversampling'] = meta['oversamp'] ndd = NDData(data, meta=meta) return GriddedPSFModel(ndd)
def is_stdpsf(origin, filepath, fileobj, *args, **kwargs): """ Determine whether a file is a STDPSF FITS file. Parameters ---------- origin : {'read', 'write'} A string indicating whether the file is to be opened for reading or writing. filepath : str The file path of the FITS file. fileobj : file-like object An open file object to read the file's contents, or `None` if the file could not be opened. *args, **kwargs Any additional positional or keyword arguments for the read or write function. Returns ------- result : bool Returns `True` if the given file is a STDPSF FITS file. """ if filepath is not None: extens = ('.fits', '.fits.gz', '.fit', '.fit.gz', '.fts', '.fts.gz') isfits = filepath.lower().endswith(extens) if isfits: with warnings.catch_warnings(): warnings.simplefilter('ignore', VerifyWarning) header = fits.getheader(filepath) keys = ('NAXIS3', 'NXPSFS', 'NYPSFS') return all(key in header for key in keys) return False def is_webbpsf(origin, filepath, fileobj, *args, **kwargs): """ Determine whether a file is a WebbPSF FITS file. Parameters ---------- origin : {'read', 'write'} A string indicating whether the file is to be opened for reading or writing. filepath : str The file path of the FITS file. fileobj : file-like object An open file object to read the file's contents, or `None` if the file could not be opened. *args, **kwargs Any additional positional or keyword arguments for the read or write function. Returns ------- result : bool Returns `True` if the given file is a WebbPSF FITS file. """ if filepath is not None: extens = ('.fits', '.fits.gz', '.fit', '.fit.gz', '.fts', '.fts.gz') isfits = filepath.lower().endswith(extens) if isfits: with warnings.catch_warnings(): warnings.simplefilter('ignore', VerifyWarning) header = fits.getheader(filepath) keys = ('NAXIS3', 'OVERSAMP', 'DET_YX0') return all(key in header for key in keys) return False