Skip to content
This repository was archived by the owner on May 2, 2025. It is now read-only.

Commit 17aeff3

Browse files
committed
Merge branch 'tickets/DM-15104'
2 parents 19f2ef8 + 77d2039 commit 17aeff3

File tree

1 file changed

+82
-26
lines changed

1 file changed

+82
-26
lines changed

python/lsst/pipe/drivers/multiBandDriver.py

Lines changed: 82 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from lsst.pipe.base import ArgumentParser, TaskRunner
99
from lsst.pipe.tasks.multiBand import (DetectCoaddSourcesTask,
1010
MergeDetectionsTask,
11+
DeblendCoaddSourcesTask,
1112
MeasureMergedCoaddSourcesTask,
1213
MergeMeasurementsTask,)
1314
from lsst.ctrl.pool.parallel import BatchPoolTask
@@ -76,8 +77,9 @@ class MultiBandDriverConfig(Config):
7677
doc="Detect sources on coadd")
7778
mergeCoaddDetections = ConfigurableField(
7879
target=MergeDetectionsTask, doc="Merge detections")
80+
deblendCoaddSources = ConfigurableField(target=DeblendCoaddSourcesTask, doc="Deblend merged detections")
7981
measureCoaddSources = ConfigurableField(target=MeasureMergedCoaddSourcesTask,
80-
doc="Measure merged detections")
82+
doc="Measure merged and (optionally) deblended detections")
8183
mergeCoaddMeasurements = ConfigurableField(
8284
target=MergeMeasurementsTask, doc="Merge measurements")
8385
forcedPhotCoadd = ConfigurableField(target=ForcedPhotCoaddTask,
@@ -96,7 +98,7 @@ def setDefaults(self):
9698
self.forcedPhotCoadd.references.retarget(MultiBandReferencesTask)
9799

98100
def validate(self):
99-
for subtask in ("mergeCoaddDetections", "measureCoaddSources",
101+
for subtask in ("mergeCoaddDetections", "deblendCoaddSources", "measureCoaddSources",
100102
"mergeCoaddMeasurements", "forcedPhotCoadd"):
101103
coaddName = getattr(self, subtask).coaddName
102104
if coaddName != self.coaddName:
@@ -160,7 +162,29 @@ def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), *
160162
self.reuse = tuple(reuse)
161163
self.makeSubtask("detectCoaddSources")
162164
self.makeSubtask("mergeCoaddDetections", schema=schema)
163-
self.makeSubtask("measureCoaddSources", schema=afwTable.Schema(self.mergeCoaddDetections.schema),
165+
if self.config.measureCoaddSources.inputCatalog.startswith("deblended"):
166+
# Ensure that the output from deblendCoaddSources matches the input to measureCoaddSources
167+
self.measurementInput = self.config.measureCoaddSources.inputCatalog
168+
self.deblenderOutput = []
169+
if self.config.deblendCoaddSources.simultaneous:
170+
if self.config.deblendCoaddSources.multiBandDeblend.conserveFlux:
171+
self.deblenderOutput.append("deblendedFlux")
172+
if self.config.deblendCoaddSources.multiBandDeblend.saveTemplates:
173+
self.deblenderOutput.append("deblendedModel")
174+
else:
175+
self.deblenderOutput.append("deblendedFlux")
176+
if self.measurementInput not in self.deblenderOutput:
177+
err = "Measurement input '{0}' is not in the list of deblender output catalogs '{1}'"
178+
raise ValueError(err.format(self.measurementInput, self.deblenderOutput))
179+
180+
self.makeSubtask("deblendCoaddSources",
181+
schema=afwTable.Schema(self.mergeCoaddDetections.schema),
182+
peakSchema=afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
183+
butler=butler)
184+
measureInputSchema = afwTable.Schema(self.deblendCoaddSources.schema)
185+
else:
186+
measureInputSchema = afwTable.Schema(self.mergeCoaddDetections.schema)
187+
self.makeSubtask("measureCoaddSources", schema=measureInputSchema,
164188
peakSchema=afwTable.Schema(
165189
self.mergeCoaddDetections.merged.getPeakSchema()),
166190
refObjLoader=refObjLoader, butler=butler)
@@ -270,30 +294,30 @@ def runDataRef(self, patchRefList):
270294

271295
pool.map(self.runMergeDetections, patches.values())
272296

273-
# Measure merged detections, and test for reprocessing
297+
# Deblend merged detections, and test for reprocessing
274298
#
275299
# The reprocessing allows us to have multiple attempts at deblending large footprints. Large
276300
# footprints can suck up a lot of memory in the deblender, which means that when we process on a
277301
# cluster, we want to refuse to deblend them (they're flagged "deblend.parent-too-big"). But since
278302
# they may have astronomically interesting data, we want the ability to go back and reprocess them
279303
# with a more permissive configuration when we have more memory or processing time.
280304
#
281-
# self.runMeasureMerged will return whether there are any footprints in that image that required
305+
# self.runDeblendMerged will return whether there are any footprints in that image that required
282306
# reprocessing. We need to convert that list of booleans into a dict mapping the patchId (x,y) to
283307
# a boolean. That tells us whether the merge measurement and forced photometry need to be re-run on
284308
# a particular patch.
285309
#
286310
# This determination of which patches need to be reprocessed exists only in memory (the measurements
287311
# have been written, clobbering the old ones), so if there was an exception we would lose this
288-
# information, leaving things in an inconsistent state (measurements new, but merged measurements and
312+
# information, leaving things in an inconsistent state (measurements, merged measurements and
289313
# forced photometry old). To attempt to preserve this status, we touch a file (dataset named
290-
# "deepCoadd_multibandReprocessing") --- if this file exists, we need to re-run the merge and
291-
# forced photometry.
314+
# "deepCoadd_multibandReprocessing") --- if this file exists, we need to re-run the measurements,
315+
# merge and forced photometry.
292316
#
293317
# This is, hopefully, a temporary workaround until we can improve the
294318
# deblender.
295319
try:
296-
reprocessed = pool.map(self.runMeasureMerged, dataIdList)
320+
reprocessed = pool.map(self.runDeblendMerged, patches.values())
297321
finally:
298322
if self.config.reprocessing:
299323
patchReprocessing = {}
@@ -317,10 +341,12 @@ def runDataRef(self, patchRefList):
317341
patchReprocessing[patchId] = True
318342

319343
# Only process patches that have been identified as needing it
344+
pool.map(self.runMeasurements, [dataId1 for dataId1 in dataIdList if not self.config.reprocessing or
345+
patchReprocessing[dataId1["patch"]]])
320346
pool.map(self.runMergeMeasurements, [idList for patchId, idList in patches.items() if
321347
not self.config.reprocessing or patchReprocessing[patchId]])
322348
pool.map(self.runForcedPhot, [dataId1 for dataId1 in dataIdList if not self.config.reprocessing or
323-
patchReprocessing[dataId["patch"]]])
349+
patchReprocessing[dataId1["patch"]]])
324350

325351
# Remove persisted reprocessing determination
326352
if self.config.reprocessing:
@@ -368,45 +394,75 @@ def runMergeDetections(self, cache, dataIdList):
368394
return
369395
self.mergeCoaddDetections.runDataRef(dataRefList)
370396

371-
def runMeasureMerged(self, cache, dataId):
372-
"""!Run measurement on a patch for a single filter
397+
def runDeblendMerged(self, cache, dataIdList):
398+
"""Run the deblender on a list of dataId's
373399
374400
Only slave nodes execute this method.
375401
376-
@param cache: Pool cache, with butler
377-
@param dataId: Data identifier for patch
378-
@return whether the patch requires reprocessing.
402+
Parameters
403+
----------
404+
cache: Pool cache
405+
Pool cache with butler.
406+
dataIdList: list
407+
Data identifier for patch in each band.
408+
409+
Returns
410+
-------
411+
result: bool
412+
whether the patch requires reprocessing.
379413
"""
380-
with self.logOperation("measurement on %s" % (dataId,)):
381-
dataRef = getDataRef(cache.butler, dataId,
382-
self.config.coaddName + "Coadd_calexp")
414+
with self.logOperation("deblending %s" % (dataIdList,)):
415+
dataRefList = [getDataRef(cache.butler, dataId, self.config.coaddName + "Coadd_calexp") for
416+
dataId in dataIdList]
383417
reprocessing = False # Does this patch require reprocessing?
384-
if ("measureCoaddSources" in self.reuse and
385-
dataRef.datasetExists(self.config.coaddName + "Coadd_meas", write=True)):
418+
if ("deblendCoaddSources" in self.reuse and
419+
dataRef.datasetExists(self.config.coaddName + self.measurementInput, write=True)):
386420
if not self.config.reprocessing:
387-
self.log.info("Skipping measureCoaddSources for %s; output already exists" % dataId)
421+
self.log.info("Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
388422
return False
389423

390-
catalog = dataRef.get(self.config.coaddName + "Coadd_meas")
424+
catalog = dataRefList[0].get(self.config.coaddName + self.measurementInput)
391425
bigFlag = catalog["deblend.parent-too-big"]
392426
numOldBig = bigFlag.sum()
393427
if numOldBig == 0:
394428
self.log.info("No large footprints in %s" %
395429
(dataRef.dataId,))
396430
return False
397-
numNewBig = sum((self.measureCoaddSources.deblend.isLargeFootprint(src.getFootprint()) for
431+
numNewBig = sum((self.deblendCoaddSources.isLargeFootprint(src.getFootprint()) for
398432
src in catalog[bigFlag]))
399433
if numNewBig == numOldBig:
400434
self.log.info("All %d formerly large footprints continue to be large in %s" %
401-
(numOldBig, dataRef.dataId,))
435+
(numOldBig, dataRefList[0].dataId,))
402436
return False
403437
self.log.info("Found %d large footprints to be reprocessed in %s" %
404-
(numOldBig - numNewBig, dataRef.dataId))
438+
(numOldBig - numNewBig, [dataRef.dataId for dataRef in dataRefList]))
405439
reprocessing = True
406440

407-
self.measureCoaddSources.runDataRef(dataRef)
441+
self.deblendCoaddSources.runDataRef(dataRefList)
408442
return reprocessing
409443

444+
def runMeasurements(self, cache, dataId):
445+
"""Run measurement on a patch for a single filter
446+
447+
Only slave nodes execute this method.
448+
449+
Parameters
450+
----------
451+
cache: Pool cache
452+
Pool cache, with butler
453+
dataId: dataRef
454+
Data identifier for patch
455+
"""
456+
with self.logOperation("measurements on %s" % (dataId,)):
457+
dataRef = getDataRef(cache.butler, dataId,
458+
self.config.coaddName + "Coadd_calexp")
459+
if ("measureCoaddSources" in self.reuse and
460+
not self.config.reprocessing and
461+
dataRef.datasetExists(self.config.coaddName + "Coadd_meas", write=True)):
462+
self.log.info("Skipping measuretCoaddSources for %s; output already exists" % dataId)
463+
return
464+
self.measureCoaddSources.runDataRef(dataRef)
465+
410466
def runMergeMeasurements(self, cache, dataIdList):
411467
"""!Run measurement merging on a patch
412468

0 commit comments

Comments
 (0)