From 891704d89bd22f009c085f04b7413cebe9ec6563 Mon Sep 17 00:00:00 2001 From: Lee Kelvin Date: Tue, 19 Mar 2024 12:55:10 -0700 Subject: [PATCH] Rewrite brightStarStamp.py --- .../lsst/meas/algorithms/brightStarStamps.py | 841 ++++++------------ 1 file changed, 248 insertions(+), 593 deletions(-) diff --git a/python/lsst/meas/algorithms/brightStarStamps.py b/python/lsst/meas/algorithms/brightStarStamps.py index b21e01f0d..777effee5 100644 --- a/python/lsst/meas/algorithms/brightStarStamps.py +++ b/python/lsst/meas/algorithms/brightStarStamps.py @@ -19,455 +19,163 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Collection of small images (stamps), each centered on a bright star.""" +"""Collection of small images (postage stamps).""" + +from __future__ import annotations __all__ = ["BrightStarStamp", "BrightStarStamps"] -import logging -from collections.abc import Collection +from collections.abc import Mapping, Sequence from dataclasses import dataclass -from functools import reduce -from operator import ior import numpy as np -from lsst.afw.geom import SpanSet, Stencil -from lsst.afw.image import MaskedImageF -from lsst.afw.math import Property, StatisticsControl, makeStatistics, stringToStatisticsProperty -from lsst.afw.table.io import Persistable -from lsst.geom import Point2I - -from .stamps import AbstractStamp, Stamps, readFitsWithOptions -logger = logging.getLogger(__name__) +from lsst.afw.detection import Psf +from lsst.afw.fits import Fits, readMetadata +from lsst.afw.geom import SkyWcs +from lsst.afw.image import ImageFitsReader, MaskedImageF, MaskFitsReader +from lsst.afw.table.io import InputArchive, OutputArchive, Persistable +from lsst.daf.base import PropertyList +from lsst.geom import Point2D +from lsst.utils.introspection import get_full_type_name @dataclass -class BrightStarStamp(AbstractStamp): - """Single stamp centered on a bright star, normalized by its annularFlux. +class BrightStarStamp: + """Single stamp centered on a bright star.""" + + maskedImage: MaskedImageF + psf: Psf + wcs: SkyWcs + visit: int + detector: int + refId: int + refMag: float + position: Point2D + scale: float | None + scaleErr: float | None + pedestal: float | None + pedestalErr: float | None + pedestalScaleCov: float | None + xGradient: float | None + yGradient: float | None + globalReducedChiSquared: float | None + globalDegreesOfFreedom: int | None + psfReducedChiSquared: float | None + psfDegreesOfFreedom: int | None + psfMaskedFluxFrac: float | None - Parameters - ---------- - stamp_im : `~lsst.afw.image.MaskedImage` - Pixel data for this postage stamp - gaiaGMag : `float` - Gaia G magnitude for the object in this stamp - gaiaId : `int` - Gaia object identifier - position : `~lsst.geom.Point2I` - Origin of the stamps in its origin exposure (pixels) - archive_element : `~lsst.afw.table.io.Persistable` or None, optional - Archive element (e.g. Transform or WCS) associated with this stamp. - annularFlux : `float` or None, optional - Flux in an annulus around the object - """ + @classmethod + def _getMaskedImageClass(cls) -> type[MaskedImageF]: + return MaskedImageF - stamp_im: MaskedImageF - gaiaGMag: float - gaiaId: int - position: Point2I - archive_element: Persistable | None = None - annularFlux: float | None = None - minValidAnnulusFraction: float = 0.0 - validAnnulusFraction: float | None = None - optimalInnerRadius: int | None = None - optimalOuterRadius: int | None = None + @classmethod + def _getArchiveElementNames(cls) -> list[str]: + return ["PSF", "WCS"] @classmethod - def factory(cls, stamp_im, metadata, idx, archive_element=None, minValidAnnulusFraction=0.0): - """This method is needed to service the FITS reader. We need a standard - interface to construct objects like this. Parameters needed to - construct this object are passed in via a metadata dictionary and then - passed to the constructor of this class. This particular factory - method requires keys: G_MAGS, GAIA_IDS, and ANNULAR_FLUXES. They should - each point to lists of values. + def factory( + cls, + maskedImage: MaskedImageF, + metadata: PropertyList, + archive_elements: Mapping[str, Persistable], + ): + """This method is needed to service the FITS reader. + We need a standard interface to construct objects like this. + Parameters needed to construct this object are passed in via a metadata + dictionary and then passed to the constructor of this class. Parameters ---------- - stamp_im : `~lsst.afw.image.MaskedImage` + maskedImage : `~lsst.afw.image.MaskedImageF` Pixel data to pass to the constructor - metadata : `dict` - Dictionary containing the information - needed by the constructor. - idx : `int` - Index into the lists in ``metadata`` - archive_element : `~lsst.afw.table.io.Persistable` or None, optional - Archive element (e.g. Transform or WCS) associated with this stamp. - minValidAnnulusFraction : `float`, optional - The fraction of valid pixels within the normalization annulus of a - star. + metadata : `PropertyList` + Dictionary containing the information needed by the constructor. + archiveElements : `~collections.abc.Mapping`[ `str` , \ + `~lsst.afw.table.io.Persistable`] + Archive elements (e.g. Transform / WCS) associated with this stamp. Returns ------- - brightstarstamp : `BrightStarStamp` + stamp : `StampBase` An instance of this class """ - if "X0S" in metadata and "Y0S" in metadata: - x0 = metadata.getArray("X0S")[idx] - y0 = metadata.getArray("Y0S")[idx] - position = Point2I(x0, y0) - else: - position = None + assert archive_elements is not None return cls( - stamp_im=stamp_im, - gaiaGMag=metadata.getArray("G_MAGS")[idx], - gaiaId=metadata.getArray("GAIA_IDS")[idx], - position=position, - archive_element=archive_element, - annularFlux=metadata.getArray("ANNULAR_FLUXES")[idx], - minValidAnnulusFraction=minValidAnnulusFraction, - validAnnulusFraction=metadata.getArray("VALID_PIXELS_FRACTION")[idx], + maskedImage=maskedImage, + psf=archive_elements["PSF"], + wcs=archive_elements["WCS"], + visit=metadata["VISIT"], + detector=metadata["DETECTOR"], + refId=metadata["REFID"], + refMag=metadata["REFMAG"], + position=Point2D(metadata["X_FA"], metadata["Y_FA"]), + scale=metadata["SCALE"], + scaleErr=metadata["SCALE_ERR"], + pedestal=metadata["PEDESTAL"], + pedestalErr=metadata["PEDESTAL_ERR"], + pedestalScaleCov=metadata["PEDESTAL_SCALE_COV"], + xGradient=metadata["X_GRADIENT"], + yGradient=metadata["Y_GRADIENT"], + globalReducedChiSquared=metadata["GLOBAL_REDUCED_CHI_SQUARED"], + globalDegreesOfFreedom=metadata["GLOBAL_DEGREES_OF_FREEDOM"], + psfReducedChiSquared=metadata["PSF_REDUCED_CHI_SQUARED"], + psfDegreesOfFreedom=metadata["PSF_DEGREES_OF_FREEDOM"], + psfMaskedFluxFrac=metadata["PSF_MASKED_FLUX_FRAC"], ) - def measureAndNormalize( - self, - annulus: SpanSet, - statsControl: StatisticsControl = StatisticsControl(), - statsFlag: Property = stringToStatisticsProperty("MEAN"), - badMaskPlanes: Collection[str] = ("BAD", "SAT", "NO_DATA"), - ): - """Compute "annularFlux", the integrated flux within an annulus - around an object's center, and normalize it. - - Since the center of bright stars are saturated and/or heavily affected - by ghosts, we measure their flux in an annulus with a large enough - inner radius to avoid the most severe ghosts and contain enough - non-saturated pixels. - - Parameters - ---------- - annulus : `~lsst.afw.geom.spanSet.SpanSet` - SpanSet containing the annulus to use for normalization. - statsControl : `~lsst.afw.math.statistics.StatisticsControl`, optional - StatisticsControl to be used when computing flux over all pixels - within the annulus. - statsFlag : `~lsst.afw.math.statistics.Property`, optional - statsFlag to be passed on to ``afwMath.makeStatistics`` to compute - annularFlux. Defaults to a simple MEAN. - badMaskPlanes : `collections.abc.Collection` [`str`] - Collection of mask planes to ignore when computing annularFlux. - """ - stampSize = self.stamp_im.getDimensions() - # Create image: science pixel values within annulus, NO_DATA elsewhere - maskPlaneDict = self.stamp_im.mask.getMaskPlaneDict() - annulusImage = MaskedImageF(stampSize, planeDict=maskPlaneDict) - annulusMask = annulusImage.mask - annulusMask.array[:] = 2 ** maskPlaneDict["NO_DATA"] - annulus.copyMaskedImage(self.stamp_im, annulusImage) - # Set mask planes to be ignored. - andMask = reduce(ior, (annulusMask.getPlaneBitMask(bm) for bm in badMaskPlanes)) - statsControl.setAndMask(andMask) - - annulusStat = makeStatistics(annulusImage, statsFlag, statsControl) - # Determine the number of valid (unmasked) pixels within the annulus. - unMasked = annulusMask.array.size - np.count_nonzero(annulusMask.array) - self.validAnnulusFraction = unMasked / annulus.getArea() - logger.info( - "The Star's annulus contains %s valid pixels and the annulus itself contains %s pixels.", - unMasked, - annulus.getArea(), - ) - if unMasked > (annulus.getArea() * self.minValidAnnulusFraction): - # Compute annularFlux. - self.annularFlux = annulusStat.getValue() - logger.info("Annular flux is: %s", self.annularFlux) - else: - raise RuntimeError( - f"Less than {self.minValidAnnulusFraction * 100}% of pixels within the annulus are valid." - ) - if np.isnan(self.annularFlux): - raise RuntimeError("Annular flux computation failed, likely because there are no valid pixels.") - if self.annularFlux < 0: - raise RuntimeError("The annular flux is negative. The stamp can not be normalized!") - # Normalize stamps. - self.stamp_im.image.array /= self.annularFlux - return None - - -class BrightStarStamps(Stamps): - """Collection of bright star stamps and associated metadata. + def _getMaskedImage(self): + return self.maskedImage + + def _getArchiveElements(self): + return {"PSF": self.psf, "WCS": self.wcs} + + def _getMetadata(self) -> PropertyList | None: + md = PropertyList() + md["VISIT"] = self.visit + md["DETECTOR"] = self.detector + md["REFID"] = self.refId + md["REFMAG"] = self.refMag + md["X_FA"] = self.position.x + md["Y_FA"] = self.position.y + md["SCALE"] = self.scale + md["SCALE_ERR"] = self.scaleErr + md["PEDESTAL"] = self.pedestal + md["PEDESTAL_ERR"] = self.pedestalErr + md["PEDESTAL_SCALE_COV"] = self.pedestalScaleCov + md["X_GRADIENT"] = self.xGradient + md["Y_GRADIENT"] = self.yGradient + md["GLOBAL_REDUCED_CHI_SQUARED"] = self.globalReducedChiSquared + md["GLOBAL_DEGREES_OF_FREEDOM"] = self.globalDegreesOfFreedom + md["PSF_REDUCED_CHI_SQUARED"] = self.psfReducedChiSquared + md["PSF_DEGREES_OF_FREEDOM"] = self.psfDegreesOfFreedom + md["PSF_MASKED_FLUX_FRAC"] = self.psfMaskedFluxFrac + return md + + +class BrightStarStamps(Sequence[BrightStarStamp]): + """Collection of stamps and associated metadata. Parameters ---------- - starStamps : `collections.abc.Sequence` [`BrightStarStamp`] - Sequence of star stamps. Cannot contain both normalized and - unnormalized stamps. - innerRadius : `int`, optional - Inner radius value, in pixels. This and ``outerRadius`` define the - annulus used to compute the ``"annularFlux"`` values within each - ``starStamp``. Must be provided if ``normalize`` is True. - outerRadius : `int`, optional - Outer radius value, in pixels. This and ``innerRadius`` define the - annulus used to compute the ``"annularFlux"`` values within each - ``starStamp``. Must be provided if ``normalize`` is True. - nb90Rots : `int`, optional - Number of 90 degree rotations required to compensate for detector - orientation. + starStamps : iterable + This should be an iterable of dataclass objects + a la ``~lsst.meas.algorithms.BrightStarStamp``. metadata : `~lsst.daf.base.PropertyList`, optional - Metadata associated with the bright stars. - use_mask : `bool` - If `True` read and write mask data. Default `True`. - use_variance : `bool` - If ``True`` read and write variance data. Default ``False``. - use_archive : `bool` - If ``True`` read and write an Archive that contains a Persistable - associated with each stamp. In the case of bright stars, this is - usually a ``TransformPoint2ToPoint2``, used to warp each stamp - to the same pixel grid before stacking. - - Raises - ------ - ValueError - Raised if one of the star stamps provided does not contain the - required keys. - AttributeError - Raised if there is a mix-and-match of normalized and unnormalized - stamps, stamps normalized with different annulus definitions, or if - stamps are to be normalized but annular radii were not provided. - - Notes - ----- - A butler can be used to read only a part of the stamps, specified by a - bbox: - - >>> starSubregions = butler.get( - "brightStarStamps", - dataId, - parameters={"bbox": bbox} - ) + Metadata associated with the objects within the stamps. """ def __init__( self, starStamps, - innerRadius=None, - outerRadius=None, - nb90Rots=None, metadata=None, - use_mask=True, - use_variance=False, - use_archive=False, ): - super().__init__(starStamps, metadata, use_mask, use_variance, use_archive) - # Ensure stamps contain a flux measure if expected to be normalized. - self._checkNormalization(False, innerRadius, outerRadius) - self._innerRadius, self._outerRadius = innerRadius, outerRadius - if innerRadius is not None and outerRadius is not None: - self.normalized = True - else: - self.normalized = False - self.nb90Rots = nb90Rots - - @classmethod - def initAndNormalize( - cls, - starStamps, - innerRadius, - outerRadius, - nb90Rots=None, - metadata=None, - use_mask=True, - use_variance=False, - use_archive=False, - imCenter=None, - discardNanFluxObjects=True, - forceFindFlux=False, - statsControl=StatisticsControl(), - statsFlag=stringToStatisticsProperty("MEAN"), - badMaskPlanes=("BAD", "SAT", "NO_DATA"), - ): - """Normalize a set of bright star stamps and initialize a - BrightStarStamps instance. - - Since the center of bright stars are saturated and/or heavily affected - by ghosts, we measure their flux in an annulus with a large enough - inner radius to avoid the most severe ghosts and contain enough - non-saturated pixels. - - Parameters - ---------- - starStamps : `collections.abc.Sequence` [`BrightStarStamp`] - Sequence of star stamps. Cannot contain both normalized and - unnormalized stamps. - innerRadius : `int` - Inner radius value, in pixels. This and ``outerRadius`` define the - annulus used to compute the ``"annularFlux"`` values within each - ``starStamp``. - outerRadius : `int` - Outer radius value, in pixels. This and ``innerRadius`` define the - annulus used to compute the ``"annularFlux"`` values within each - ``starStamp``. - nb90Rots : `int`, optional - Number of 90 degree rotations required to compensate for detector - orientation. - metadata : `~lsst.daf.base.PropertyList`, optional - Metadata associated with the bright stars. - use_mask : `bool` - If `True` read and write mask data. Default `True`. - use_variance : `bool` - If ``True`` read and write variance data. Default ``False``. - use_archive : `bool` - If ``True`` read and write an Archive that contains a Persistable - associated with each stamp. In the case of bright stars, this is - usually a ``TransformPoint2ToPoint2``, used to warp each stamp - to the same pixel grid before stacking. - imCenter : `collections.abc.Sequence`, optional - Center of the object, in pixels. If not provided, the center of the - first stamp's pixel grid will be used. - discardNanFluxObjects : `bool` - Whether objects with NaN annular flux should be discarded. - If False, these objects will not be normalized. - forceFindFlux : `bool` - Whether to try to find the flux of objects with NaN annular flux - at a different annulus. - statsControl : `~lsst.afw.math.statistics.StatisticsControl`, optional - StatisticsControl to be used when computing flux over all pixels - within the annulus. - statsFlag : `~lsst.afw.math.statistics.Property`, optional - statsFlag to be passed on to ``~lsst.afw.math.makeStatistics`` to - compute annularFlux. Defaults to a simple MEAN. - badMaskPlanes : `collections.abc.Collection` [`str`] - Collection of mask planes to ignore when computing annularFlux. - - Raises - ------ - ValueError - Raised if one of the star stamps provided does not contain the - required keys. - AttributeError - Raised if there is a mix-and-match of normalized and unnormalized - stamps, stamps normalized with different annulus definitions, or if - stamps are to be normalized but annular radii were not provided. - """ - stampSize = starStamps[0].stamp_im.getDimensions() - if imCenter is None: - imCenter = stampSize[0] // 2, stampSize[1] // 2 - - # Create SpanSet of annulus. - outerCircle = SpanSet.fromShape(outerRadius, Stencil.CIRCLE, offset=imCenter) - innerCircle = SpanSet.fromShape(innerRadius, Stencil.CIRCLE, offset=imCenter) - annulusWidth = outerRadius - innerRadius - if annulusWidth < 1: - raise ValueError("The annulus width must be greater than 1 pixel.") - annulus = outerCircle.intersectNot(innerCircle) - - # Initialize (unnormalized) brightStarStamps instance. - bss = cls( - starStamps, - innerRadius=None, - outerRadius=None, - nb90Rots=nb90Rots, - metadata=metadata, - use_mask=use_mask, - use_variance=use_variance, - use_archive=use_archive, - ) - - # Ensure that no stamps have already been normalized. - bss._checkNormalization(True, innerRadius, outerRadius) - bss._innerRadius, bss._outerRadius = innerRadius, outerRadius - - # Apply normalization. - rejects = [] - badStamps = [] - for stamp in bss._stamps: - try: - stamp.measureAndNormalize( - annulus, statsControl=statsControl, statsFlag=statsFlag, badMaskPlanes=badMaskPlanes - ) - # Stars that are missing from input bright star stamps may - # still have a flux within the normalization annulus. The - # following two lines make sure that these stars are included - # in the subtraction process. Failing to assign the optimal - # radii values may result in an error in the `createAnnulus` - # method of the `SubtractBrightStarsTask` class. An alternative - # to handle this is to create two types of stamps that are - # missing from the input brightStarStamps object. One for those - # that have flux within the normalization annulus and another - # for those that do not have a flux within the normalization - # annulus. - stamp.optimalOuterRadius = outerRadius - stamp.optimalInnerRadius = innerRadius - except RuntimeError as err: - logger.error(err) - # Optionally keep NaN flux objects, for bookkeeping purposes, - # and to avoid having to re-find and redo the preprocessing - # steps needed before bright stars can be subtracted. - if discardNanFluxObjects: - rejects.append(stamp) - elif forceFindFlux: - newInnerRadius = innerRadius - newOuterRadius = outerRadius - while True: - newOuterRadius += annulusWidth - newInnerRadius += annulusWidth - if newOuterRadius > min(imCenter): - logger.info("No flux found for the star with Gaia ID of %s", stamp.gaiaId) - stamp.annularFlux = None - badStamps.append(stamp) - break - newOuterCircle = SpanSet.fromShape(newOuterRadius, Stencil.CIRCLE, offset=imCenter) - newInnerCircle = SpanSet.fromShape(newInnerRadius, Stencil.CIRCLE, offset=imCenter) - newAnnulus = newOuterCircle.intersectNot(newInnerCircle) - try: - stamp.measureAndNormalize( - newAnnulus, - statsControl=statsControl, - statsFlag=statsFlag, - badMaskPlanes=badMaskPlanes, - ) - - except RuntimeError: - stamp.annularFlux = np.nan - logger.error( - "The annular flux was not found for radii %d and %d", - newInnerRadius, - newOuterRadius, - ) - if stamp.annularFlux and stamp.annularFlux > 0: - logger.info("The flux is found within an optimized annulus.") - logger.info( - "The optimized annulus radii are %d and %d and the flux is %f", - newInnerRadius, - newOuterRadius, - stamp.annularFlux, - ) - stamp.optimalOuterRadius = newOuterRadius - stamp.optimalInnerRadius = newInnerRadius - break - else: - stamp.annularFlux = np.nan - - # Remove rejected stamps. - bss.normalized = True - if discardNanFluxObjects: - for reject in rejects: - bss._stamps.remove(reject) - elif forceFindFlux: - for badStamp in badStamps: - bss._stamps.remove(badStamp) - bss._innerRadius, bss._outerRadius = None, None - return bss, badStamps - return bss - - def _refresh_metadata(self): - """Refresh metadata. Should be called before writing the object out. - - This method adds full lists of positions, Gaia magnitudes, IDs and - annular fluxes to the shared metadata. - """ - self._metadata["G_MAGS"] = self.getMagnitudes() - self._metadata["GAIA_IDS"] = self.getGaiaIds() - positions = self.getPositions() - self._metadata["X0S"] = [xy0[0] for xy0 in positions] - self._metadata["Y0S"] = [xy0[1] for xy0 in positions] - self._metadata["ANNULAR_FLUXES"] = self.getAnnularFluxes() - self._metadata["VALID_PIXELS_FRACTION"] = self.getValidPixelsFraction() - self._metadata["NORMALIZED"] = self.normalized - self._metadata["INNER_RADIUS"] = self._innerRadius - self._metadata["OUTER_RADIUS"] = self._outerRadius - if self.nb90Rots is not None: - self._metadata["NB_90_ROTS"] = self.nb90Rots - return None + self._stamps = list(starStamps) + self._metadata = PropertyList() if metadata is None else metadata.deepCopy() + self.useMask = True + self.useVariance = True + self.useArchive = True + self.byRefId = {stamp.refId: stamp for stamp in self} @classmethod def readFits(cls, filename): @@ -491,195 +199,142 @@ def readFitsWithOptions(cls, filename, options): options : `PropertyList` Collection of metadata parameters. """ - stamps, metadata = readFitsWithOptions(filename, BrightStarStamp.factory, options) - nb90Rots = metadata["NB_90_ROTS"] if "NB_90_ROTS" in metadata else None - if metadata["NORMALIZED"]: - return cls( - stamps, - innerRadius=metadata["INNER_RADIUS"], - outerRadius=metadata["OUTER_RADIUS"], - nb90Rots=nb90Rots, - metadata=metadata, - use_mask=metadata["HAS_MASK"], - use_variance=metadata["HAS_VARIANCE"], - use_archive=metadata["HAS_ARCHIVE"], - ) - else: - return cls( - stamps, - nb90Rots=nb90Rots, - metadata=metadata, - use_mask=metadata["HAS_MASK"], - use_variance=metadata["HAS_VARIANCE"], - use_archive=metadata["HAS_ARCHIVE"], - ) - - def append(self, item, innerRadius=None, outerRadius=None): - """Add an additional bright star stamp. + # Extract necessary info from metadata + metadata = readMetadata(filename, hdu=0) + nStamps = metadata["N_STAMPS"] + stamps_archiveElementNames = ["PSF", "WCS"] + with Fits(filename, "r") as fitsFile: + nExtensions = fitsFile.countHdus() + stampParts = {} + + default_dtype = np.dtype(MaskedImageF.dtype) + variance_dtype = np.dtype(np.float32) # Variance is always the same type + + # We need to be careful as nExtensions includes the primary HDU + stampMetadata = {} + stamps_archiveElementIds = {} + for idx in range(nExtensions - 1): + dtype = None + hduNum = idx + 1 + md = readMetadata(filename, hdu=hduNum) + # Skip binary tables that aren't images or archives. + if md["XTENSION"] == "BINTABLE" and not ("ZIMAGE" in md and md["ZIMAGE"]): + if md["EXTNAME"] != "ARCHIVE_INDEX": + continue + if md["EXTNAME"] in ("IMAGE", "VARIANCE"): + stampId = md["EXTVER"] + reader = ImageFitsReader(filename, hdu=hduNum) + if md["EXTNAME"] == "VARIANCE": + dtype = variance_dtype + else: + dtype = default_dtype + if stamps_archiveElementNames is not None: + stamps_archiveElementIds[stampId] = { + name: archiveId + for name in stamps_archiveElementNames + if (archiveId := md.pop(name, None)) + } + # md.remove("EXTNAME") + # md.remove("EXTVER") + stampMetadata[stampId] = md + elif md["EXTNAME"] == "MASK": + stampId = md["EXTVER"] + reader = MaskFitsReader(filename, hdu=hduNum) + elif md["EXTNAME"] == "ARCHIVE_INDEX": + fitsFile.setHdu(hduNum) + archive = InputArchive.readFits(fitsFile) + continue + elif md["EXTTYPE"] == "ARCHIVE_DATA": + continue + else: + raise ValueError(f"Unknown extension type: {md['EXTNAME']}") + stampParts.setdefault(stampId, {})[md["EXTNAME"].lower()] = reader.read(dtype=dtype) - Parameters - ---------- - item : `BrightStarStamp` - Bright star stamp to append. - innerRadius : `int`, optional - Inner radius value, in pixels. This and ``outerRadius`` define the - annulus used to compute the ``"annularFlux"`` values within each - ``BrightStarStamp``. - outerRadius : `int`, optional - Outer radius value, in pixels. This and ``innerRadius`` define the - annulus used to compute the ``"annularFlux"`` values within each - ``BrightStarStamp``. - """ - if not isinstance(item, BrightStarStamp): - raise ValueError(f"Can only add instances of BrightStarStamp, got {type(item)}.") - if (item.annularFlux is None) == self.normalized: - raise AttributeError( - "Trying to append an unnormalized stamp to a normalized BrightStarStamps " - "instance, or vice-versa." + if len(stampParts) != nStamps: + raise ValueError( + f"Number of stamps read ({len(stampParts)}) does not agree with the " + f"number of stamps recorded in the metadata ({nStamps})." ) - else: - self._checkRadius(innerRadius, outerRadius) - self._stamps.append(item) - return None - - def extend(self, bss): - """Extend BrightStarStamps instance by appending elements from another - instance. - - Parameters - ---------- - bss : `BrightStarStamps` - Other instance to concatenate. - """ - if not isinstance(bss, BrightStarStamps): - raise ValueError(f"Can only extend with a BrightStarStamps object. Got {type(bss)}.") - self._checkRadius(bss._innerRadius, bss._outerRadius) - self._stamps += bss._stamps - - def getMagnitudes(self): - """Retrieve Gaia G-band magnitudes for each star. - - Returns - ------- - gaiaGMags : `list` [`float`] - Gaia G-band magnitudes for each star. - """ - return [stamp.gaiaGMag for stamp in self._stamps] - - def getGaiaIds(self): - """Retrieve Gaia IDs for each star. - - Returns - ------- - gaiaIds : `list` [`int`] - Gaia IDs for each star. - """ - return [stamp.gaiaId for stamp in self._stamps] + # Construct the stamps themselves + stamps = [] + for k in range(nStamps): + # Need to increment by one since EXTVER starts at 1 + maskedImage = MaskedImageF(**stampParts[k + 1]) - def getAnnularFluxes(self): - """Retrieve normalization factor for each star. + stamp_archiveElementIds = stamps_archiveElementIds.get(k + 1, {}) + stamp_archiveElements = {name: archive.get(id) for name, id in stamp_archiveElementIds.items()} - These are computed by integrating the flux in annulus centered on the - bright star, far enough from center to be beyond most severe ghosts and - saturation. - The inner and outer radii that define the annulus can be recovered from - the metadata. + stamps.append(BrightStarStamp.factory(maskedImage, stampMetadata[k + 1], stamp_archiveElements)) - Returns - ------- - annularFluxes : `list` [`float`] - Annular fluxes which give the normalization factor for each star. - """ - return [stamp.annularFlux for stamp in self._stamps] - - def getValidPixelsFraction(self): - """Retrieve the fraction of valid pixels within the normalization - annulus for each star. - - Returns - ------- - validPixelsFractions : `list` [`float`] - Fractions of valid pixels within the normalization annulus for each - star. - """ - return [stamp.validAnnulusFraction for stamp in self._stamps] + return cls(stamps, metadata=metadata) - def selectByMag(self, magMin=None, magMax=None): - """Return the subset of bright star stamps for objects with specified - magnitude cuts (in Gaia G). + def writeFits(self, filename: str): + """Write this object to a FITS file. Parameters ---------- - magMin : `float`, optional - Keep only stars fainter than this value. - magMax : `float`, optional - Keep only stars brighter than this value. - """ - subset = [ - stamp - for stamp in self._stamps - if (magMin is None or stamp.gaiaGMag > magMin) and (magMax is None or stamp.gaiaGMag < magMax) - ] - # This saves looping over init when guaranteed to be the correct type. - instance = BrightStarStamps( - (), innerRadius=self._innerRadius, outerRadius=self._outerRadius, metadata=self._metadata - ) - instance._stamps = subset - return instance - - def _checkRadius(self, innerRadius, outerRadius): - """Ensure provided annulus radius is consistent with that already - present in the instance, or with arguments passed on at initialization. - """ - if innerRadius != self._innerRadius or outerRadius != self._outerRadius: - raise AttributeError( - f"Trying to mix stamps normalized with annulus radii {innerRadius, outerRadius} with those " - "of BrightStarStamp instance\n" - f"(computed with annular radii {self._innerRadius, self._outerRadius})." - ) - - def _checkNormalization(self, normalize, innerRadius, outerRadius): - """Ensure there is no mixing of normalized and unnormalized stars, and - that, if requested, normalization can be performed. + filename : `str` + Name of the FITS file to write. """ - noneFluxCount = self.getAnnularFluxes().count(None) - nStamps = len(self) - nFluxVals = nStamps - noneFluxCount - if noneFluxCount and noneFluxCount < nStamps: - # At least one stamp contains an annularFlux value (i.e. has been - # normalized), but not all of them do. - raise AttributeError( - f"Only {nFluxVals} stamps contain an annularFlux value.\nAll stamps in a BrightStarStamps " - "instance must either be normalized with the same annulus definition, or none of them can " - "contain an annularFlux value." + typeName = get_full_type_name(self) + metadata = self._metadata.copy() + + # Store metadata in the primary HDU + metadata["HAS_MASK"] = True + metadata["HAS_VARIANCE"] = True + metadata["HAS_ARCHIVE"] = True + metadata["N_STAMPS"] = len(self._stamps) + metadata["STAMPCLS"] = typeName + metadata["VERSION"] = 2 # Record version number in case of future code changes + + # Create the primary HDU with global metadata + fitsFile = Fits(filename, "w") + fitsFile.createEmpty() + + # Store Persistables in an OutputArchive and write to the primary HDU + stamps_archiveElementNames = set() + stamps_archiveElementIds = [] + oa = OutputArchive() + for stamp in self._stamps: + stamp_archiveElements = stamp._getArchiveElements() + stamps_archiveElementNames.update(stamp_archiveElements.keys()) + stamps_archiveElementIds.append( + {name: oa.put(persistable) for name, persistable in stamp_archiveElements.items()} ) - elif normalize: - # Stamps are to be normalized; ensure annular radii are specified - # and they have no annularFlux. - if innerRadius is None or outerRadius is None: - raise AttributeError( - "For stamps to be normalized (normalize=True), please provide a valid value (in pixels) " - "for both innerRadius and outerRadius." - ) - elif noneFluxCount < nStamps: - raise AttributeError( - f"{nFluxVals} stamps already contain an annularFlux value. For stamps to be normalized, " - "all their annularFlux must be None." - ) - elif innerRadius is not None and outerRadius is not None: - # Radii provided, but normalize=False; check that stamps already - # contain annularFluxes. - if noneFluxCount: - raise AttributeError( - f"{noneFluxCount} stamps contain no annularFlux, but annular radius values were provided " - "and normalize=False.\nTo normalize stamps, set normalize to True." - ) - else: - # At least one radius value is missing; ensure no stamps have - # already been normalized. - if nFluxVals: - raise AttributeError( - f"{nFluxVals} stamps contain an annularFlux value. If stamps have been normalized, the " - "innerRadius and outerRadius values used must be provided." - ) - return None + fitsFile.writeMetadata(metadata) + del metadata + oa.writeFits(fitsFile) + + fitsFile.closeFile() + + # Add all pixel data to extension HDUs; opt. write mask/variance info + for i, (stamp, stamp_archiveElementIds) in enumerate(zip(self._stamps, stamps_archiveElementIds)): + metadata = PropertyList() + extVer = i + 1 # EXTVER should be 1-based; the index from enumerate is 0-based + metadata.update({"EXTVER": extVer, "EXTNAME": "IMAGE"}) + if stampMetadata := stamp._getMetadata(): + metadata.update(stampMetadata) + if stamp_archiveElementIds: + metadata.update(stamp_archiveElementIds) + for stamps_archiveElementName in sorted(stamps_archiveElementNames): + metadata.add("ARCHIVE_ELEMENT", stamps_archiveElementName) + stamp.maskedImage.getImage().writeFits(filename, metadata=metadata, mode="a") + metadata = PropertyList() + metadata.update({"EXTVER": extVer, "EXTNAME": "MASK"}) + stamp.maskedImage.getMask().writeFits(filename, metadata=metadata, mode="a") + metadata = PropertyList() + metadata.update({"EXTVER": extVer, "EXTNAME": "VARIANCE"}) + stamp.maskedImage.getVariance().writeFits(filename, metadata=metadata, mode="a") + + def __len__(self): + return len(self._stamps) + + def __getitem__(self, index): + return self._stamps[index] + + def __iter__(self): + return iter(self._stamps) + + @property + def metadata(self): + return self._metadata