From b3464bb068acfaf1a39f2360fb7aa9c1298f773c Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 29 Oct 2025 17:44:43 +0000 Subject: [PATCH 01/21] Create additional fluorescent-only and bright field-only composite images --- .../workflows/clem/register_preprocessing_results.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 14822398..1f744dee 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -203,7 +203,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool image_combos_to_process = [ list(result.output_files.values()) # Composite image of all channels ] - # Create additional job for fluorescent-only composite image if fluorescent channels are present + # Create additional fluorescent-only and bright field-only jobs if ("gray" in result.output_files.keys()) and len(result.output_files) > 1: image_combos_to_process.append( [ @@ -212,6 +212,13 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool if channel != "gray" ] ) + image_combos_to_process.append( + [ + file + for channel, file in result.output_files.items() + if channel == "gray" + ] + ) # Request for image alignment and processing for the requested combinations for image_combo in image_combos_to_process: From 8641d38a099b93f670a4dbe83aced78be4cdcd6b Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 29 Oct 2025 18:56:42 +0000 Subject: [PATCH 02/21] Import Murfey database classes differently, to eventually differentiate them from ISPyB database tables --- .../clem/register_preprocessing_results.py | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 1f744dee..a7d69784 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -17,15 +17,8 @@ from pydantic import BaseModel from sqlmodel import Session, select +import murfey.util.db as MurfeyDB from murfey.server import _transport_object -from murfey.util.db import ( - CLEMImageMetadata, - CLEMImageSeries, - CLEMImageStack, - CLEMLIFFile, - CLEMTIFFFile, - Session as MurfeySession, -) from murfey.util.processing_params import ( default_clem_align_and_merge_parameters as processing_params, ) @@ -84,23 +77,23 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool try: # Register items in database if not already present try: - clem_img_series: CLEMImageSeries = get_db_entry( + clem_img_series: MurfeyDB.CLEMImageSeries = get_db_entry( db=murfey_db, - table=CLEMImageSeries, + table=MurfeyDB.CLEMImageSeries, session_id=session_id, series_name=result.series_name, ) - clem_metadata: CLEMImageMetadata = get_db_entry( + clem_metadata: MurfeyDB.CLEMImageMetadata = get_db_entry( db=murfey_db, - table=CLEMImageMetadata, + table=MurfeyDB.CLEMImageMetadata, session_id=session_id, file_path=result.metadata, ) # Register and link parent LIF file if present if result.parent_lif is not None: - clem_lif_file: CLEMLIFFile = get_db_entry( + clem_lif_file: MurfeyDB.CLEMLIFFile = get_db_entry( db=murfey_db, - table=CLEMLIFFile, + table=MurfeyDB.CLEMLIFFile, session_id=session_id, file_path=result.parent_lif, ) @@ -115,9 +108,9 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool # Iteratively register the output image stacks for c, (channel, output_file) in enumerate(result.output_files.items()): - clem_img_stk: CLEMImageStack = get_db_entry( + clem_img_stk: MurfeyDB.CLEMImageStack = get_db_entry( db=murfey_db, - table=CLEMImageStack, + table=MurfeyDB.CLEMImageStack, session_id=session_id, file_path=output_file, ) @@ -154,9 +147,9 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool # Register TIFF file subset clem_tiff_files = [] for file in tiff_file_subset: - clem_tiff_file: CLEMTIFFFile = get_db_entry( + clem_tiff_file: MurfeyDB.CLEMTIFFFile = get_db_entry( db=murfey_db, - table=CLEMTIFFFile, + table=MurfeyDB.CLEMTIFFFile, session_id=session_id, file_path=file, ) @@ -187,7 +180,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool try: instrument_name = ( murfey_db.exec( - select(MurfeySession).where(MurfeySession.id == session_id) + select(MurfeyDB.Session).where(MurfeyDB.Session.id == session_id) ) .one() .instrument_name From f511242d18f13888de8a576ff6d71c4b433b517f Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 31 Oct 2025 15:14:16 +0000 Subject: [PATCH 03/21] 'slot' should be optional --- src/murfey/server/ispyb.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/murfey/server/ispyb.py b/src/murfey/server/ispyb.py index f2c0cd80..163db99f 100644 --- a/src/murfey/server/ispyb.py +++ b/src/murfey/server/ispyb.py @@ -153,7 +153,11 @@ def do_insert_atlas(self, record: Atlas): return {"success": False, "return_value": None} def do_update_atlas( - self, atlas_id: int, atlas_image: str, pixel_size: float, slot: int + self, + atlas_id: int, + atlas_image: str, + pixel_size: float, + slot: int | None, ): try: with ISPyBSession() as db: From c5aca0c44cc5747d063acc97a0c2e85fe8beb9fa Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 31 Oct 2025 16:22:08 +0000 Subject: [PATCH 04/21] Updated Murfey database to integrate CLEM workflow with ISPyB * Linked 'CLEMImageSeries' to 'DataCollectionGroup' and 'GridSquare' * Tried modern type hinting notation in 'CLEMImageSeries' table * Added new columns to 'CLEMImageSeries' to keep track of image search string (for globbing images with) and type of dataset (i.e., atlas or grid square) * Removed 'composite_image' column from 'CLEMImageSeries' --- src/murfey/util/db.py | 52 +++++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index cee30b0e..f09252e8 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -3,6 +3,8 @@ of the sessions that Murfey is overseeing, along with the relationships between them. """ +from __future__ import annotations + from datetime import datetime from typing import List, Optional @@ -230,58 +232,67 @@ class CLEMImageMetadata(SQLModel, table=True): # type: ignore class CLEMImageSeries(SQLModel, table=True): # type: ignore """ - Database recording the individual files associated with a series, which are to be + Database recording the files and metadata associated with a series, which are to be processed together as a group. These files could stem from a parent LIF file, or have been compiled together from individual TIFF files. """ - id: Optional[int] = Field(default=None, primary_key=True) + id: int | None = Field(default=None, primary_key=True) series_name: str = Field( index=True ) # Name of the series, as determined from the metadata + search_string: str | None = Field(default=None) # Path for globbing with - session: Optional["Session"] = Relationship( - back_populates="image_series" - ) # Many to one - session_id: Optional[int] = Field( - foreign_key="session.id", default=None, unique=False + session: Session | None = Relationship(back_populates="image_series") # Many to one + session_id: int | None = Field(foreign_key="session.id", default=None, unique=False) + + # Type of data (atlas/overview or grid square) + data_type: str = Field(default="") # "atlas" or "grid_square" + + # Link to data collection group + data_collection_group: DataCollectionGroup | None = Relationship( + back_populates="clem_image_series" ) + dcg_id: int | None = Field(foreign_key="datacollectiongroup.id", default=None) + + # Link to grid squares + grid_square: GridSquare | None = Relationship(back_populates="clem_image_series") + grid_square_id: int | None = Field(foreign_key="gridsquare.id", default=None) # The parent LIF file this series originates from, if any - parent_lif: Optional["CLEMLIFFile"] = Relationship( + parent_lif: CLEMLIFFile | None = Relationship( back_populates="child_series", ) # Many to one - parent_lif_id: Optional[int] = Field( + parent_lif_id: int | None = Field( foreign_key="clemliffile.id", default=None, ) # The parent TIFF files used to build up the image stacks in the series, if any - parent_tiffs: List["CLEMTIFFFile"] = Relationship( + parent_tiffs: list[CLEMTIFFFile] = Relationship( back_populates="child_series", sa_relationship_kwargs={"cascade": "delete"} ) # One to many # Metadata file for this series - associated_metadata: Optional["CLEMImageMetadata"] = Relationship( + associated_metadata: CLEMImageMetadata | None = Relationship( back_populates="associated_series", ) # One to one - metadata_id: Optional[int] = Field( + metadata_id: int | None = Field( foreign_key="clemimagemetadata.id", default=None, ) - # Databases of the image stacks that comprise this series - child_stacks: List["CLEMImageStack"] = Relationship( + # Image stack entries that are part of this series + child_stacks: list[CLEMImageStack] = Relationship( back_populates="parent_series", sa_relationship_kwargs={"cascade": "delete"}, ) # One to many - - # Process checklist for series number_of_members: int = ( 0 # Expected number of image stacks belonging to this series ) + + # Composite images composite_created: bool = False # Has a composite image been created? - composite_image: Optional[str] = None # Full path to composite image class CLEMImageStack(SQLModel, table=True): # type: ignore @@ -389,6 +400,10 @@ class DataCollectionGroup(SQLModel, table=True): # type: ignore back_populates="data_collection_group", sa_relationship_kwargs={"cascade": "delete"}, ) + clem_image_series: list["CLEMImageSeries"] = Relationship( + back_populates="data_collection_group", + sa_relationship_kwargs={"cascade": "delete"}, + ) notification_parameters: List["NotificationParameter"] = Relationship( back_populates="data_collection_group", sa_relationship_kwargs={"cascade": "delete"}, @@ -591,6 +606,9 @@ class GridSquare(SQLModel, table=True): # type: ignore pixel_size: Optional[float] = None image: str = "" session: Optional[Session] = Relationship(back_populates="grid_squares") + clem_image_series: list["CLEMImageSeries"] = Relationship( + back_populates="grid_square", sa_relationship_kwargs={"cascade": "delete"} + ) foil_holes: List["FoilHole"] = Relationship( back_populates="grid_square", sa_relationship_kwargs={"cascade": "delete"} ) From c6fea876c32bc965f0ce86f3e796ef1181540745 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 31 Oct 2025 16:27:51 +0000 Subject: [PATCH 05/21] Removed registration of composite image file path from 'register_align_and_merge_results' workflow --- src/murfey/workflows/clem/register_align_and_merge_results.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/murfey/workflows/clem/register_align_and_merge_results.py b/src/murfey/workflows/clem/register_align_and_merge_results.py index fe52058b..fb5f563a 100644 --- a/src/murfey/workflows/clem/register_align_and_merge_results.py +++ b/src/murfey/workflows/clem/register_align_and_merge_results.py @@ -87,7 +87,6 @@ def register_align_and_merge_result( session_id=session_id, series_name=result.series_name, ) - clem_img_series.composite_image = str(result.composite_image) clem_img_series.composite_created = True murfey_db.add(clem_img_series) murfey_db.commit() From 09190be567ced5c0585fe72681211ff42f05a5a2 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 31 Oct 2025 16:30:01 +0000 Subject: [PATCH 06/21] Stored tag/name of data collection group in CLEMImageSeries table as well --- src/murfey/util/db.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index f09252e8..7464819c 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -254,6 +254,7 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore back_populates="clem_image_series" ) dcg_id: int | None = Field(foreign_key="datacollectiongroup.id", default=None) + dcg_name: str = Field(default="") # Link to grid squares grid_square: GridSquare | None = Relationship(back_populates="clem_image_series") From edba7eb25d4408a360c1c2761f876f2fd863f240 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 31 Oct 2025 16:30:37 +0000 Subject: [PATCH 07/21] Added logic to create data collection group and atlas table entries for CLEM workflow --- .../clem/register_preprocessing_results.py | 129 ++++++++++++++++-- 1 file changed, 118 insertions(+), 11 deletions(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index a7d69784..7f6f1ad4 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -11,6 +11,7 @@ import logging import re import traceback +from importlib.metadata import entry_points from pathlib import Path from typing import Literal, Optional @@ -164,10 +165,17 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool murfey_db.add_all(clem_tiff_files) murfey_db.commit() + # Add data type and image search string + clem_img_series.search_string = str(output_file.parent / "*tiff") + clem_img_series.data_type = ( + "atlas" if "Overview_" in result.series_name else "grid_square" + ) + murfey_db.add(clem_img_series) + murfey_db.commit() + logger.info( f"CLEM preprocessing results registered for {result.series_name!r} " ) - except Exception: logger.error( "Exception encountered when registering CLEM preprocessing result for " @@ -176,21 +184,120 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool ) return {"success": False, "requeue": False} - # Load instrument name try: - instrument_name = ( - murfey_db.exec( - select(MurfeyDB.Session).where(MurfeyDB.Session.id == session_id) - ) - .one() - .instrument_name + # Load current session from database + murfey_session = murfey_db.exec( + select(MurfeyDB.Session).where(MurfeyDB.Session.id == session_id) + ).one() + + # Determine variables to register data collection group and atlas with + visit_name = murfey_session.visit + proposal_code = "".join( + char for char in visit_name.split("-")[0] if char.isalpha() ) + proposal_number = "".join( + char for char in visit_name.split("-")[0] if char.isdigit() + ) + visit_number = visit_name.split("-")[-1] + + # Generate name/tag for data colleciton group based on series name + dcg_name = result.series_name.split("--")[0] + if result.series_name.split("--")[1].isdigit(): + dcg_name += f"--{result.series_name.split('--')[1]}" + + # Determine values for atlas + if "Overview_" in result.series_name: # These are atlas datasets + atlas_name = str(output_file.parent / "*.tiff") + atlas_pixel_size = result.pixel_size + else: + atlas_name = "" + atlas_pixel_size = 0.0 + + registration_result: dict[str, bool] + if dcg_search := murfey_db.exec( + select(MurfeyDB.DataCollectionGroup) + .where(MurfeyDB.DataCollectionGroup.session_id == session_id) + .where(MurfeyDB.DataCollectionGroup.tag == dcg_name) + ).all(): + # Update atlas if registering atlas dataset + # and data collection group already exists + dcg_entry = dcg_search[0] + if "Overview_" in result.series_name: + atlas_message = { + "session_id": session_id, + "dcgid": dcg_entry.id, + "atlas_id": dcg_entry.atlas_id, + "atlas": atlas_name, + "atlas_pixel_size": atlas_pixel_size, + "sample": dcg_entry.sample, + } + if entry_point_result := entry_points( + group="murfey.workflows", name="atlas_update" + ): + (workflow,) = entry_point_result + registration_result = workflow.load()( + message=atlas_message, + murfey_db=murfey_db, + ) + else: + logger.warning("No workflow found for 'atlas_update'") + registration_result = {"success": False, "requeue": False} + else: + registration_result = {"success": True} + else: + # Register data collection group + dcg_message = { + "microscope": murfey_session.instrument_name, + "proposal_code": proposal_code, + "proposal_number": proposal_number, + "visit_number": visit_number, + "session_id": session_id, + "tag": dcg_name, + "experiment_type": "experiment", + "experiment_type_id": None, + "atlas": atlas_name, + "atlas_pixel_size": atlas_pixel_size, + "sample": None, + } + if entry_point_result := entry_points( + group="murfey.workflows", name="data_collection_group" + ): + (workflow,) = entry_point_result + # Register grid square + registration_result = workflow.load()( + message=dcg_message, + murfey_db=murfey_db, + ) + else: + logger.warning("No workflow found for 'data_collection_group'") + registration_result = {"success": False, "requeue": False} + if registration_result.get("success", False): + logger.info( + "Successfully registered data collection group for CLEM workflow " + f"using{result.series_name!r}" + ) + else: + logger.warning( + "Failed to register data collection group for CLEM workflow " + f"using {result.series_name!r}" + ) + + # Store data collection group id in CLEM image series table + dcg_entry = murfey_db.exec( + select(MurfeyDB.DataCollectionGroup) + .where(MurfeyDB.DataCollectionGroup.session_id == session_id) + .where(MurfeyDB.DataCollectionGroup.tag == dcg_name) + ).one() + clem_img_series.dcg_id = dcg_entry.id + clem_img_series.dcg_name = dcg_entry.tag + murfey_db.add(clem_img_series) + murfey_db.commit() except Exception: logger.error( - f"Error requesting data from database for {result.series_name!r} series: \n" + "Exception encountered when registering data collection group for CLEM workflow " + f"using {result.series_name!r}: \n" f"{traceback.format_exc()}" ) - return {"success": False, "requeue": False} # Construct list of files to use for image alignment and merging steps image_combos_to_process = [ @@ -218,7 +325,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool try: submit_cluster_request( session_id=session_id, - instrument_name=instrument_name, + instrument_name=murfey_session.instrument_name, series_name=result.series_name, images=image_combo, metadata=result.metadata, From 6fa10e3286662094e543596cc96434caa1c2eeb9 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 31 Oct 2025 17:07:00 +0000 Subject: [PATCH 08/21] Revert to using older annotated types --- src/murfey/util/db.py | 44 +++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index 7464819c..07a5ba1e 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -3,8 +3,6 @@ of the sessions that Murfey is overseeing, along with the relationships between them. """ -from __future__ import annotations - from datetime import datetime from typing import List, Optional @@ -237,54 +235,60 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore have been compiled together from individual TIFF files. """ - id: int | None = Field(default=None, primary_key=True) + id: Optional[int] = Field(default=None, primary_key=True) series_name: str = Field( index=True ) # Name of the series, as determined from the metadata - search_string: str | None = Field(default=None) # Path for globbing with + search_string: Optional[str] = Field(default=None) # Path for globbing with - session: Session | None = Relationship(back_populates="image_series") # Many to one - session_id: int | None = Field(foreign_key="session.id", default=None, unique=False) + session: Optional["Session"] = Relationship( + back_populates="image_series" + ) # Many to one + session_id: Optional[int] = Field( + foreign_key="session.id", default=None, unique=False + ) # Type of data (atlas/overview or grid square) - data_type: str = Field(default="") # "atlas" or "grid_square" + data_type: Optional[str] = Field(default=None) # "atlas" or "grid_square" # Link to data collection group - data_collection_group: DataCollectionGroup | None = Relationship( + data_collection_group: Optional["DataCollectionGroup"] = Relationship( back_populates="clem_image_series" ) - dcg_id: int | None = Field(foreign_key="datacollectiongroup.id", default=None) - dcg_name: str = Field(default="") + dcg_id: Optional[int] = Field(foreign_key="datacollectiongroup.id", default=None) + dcg_name: Optional[str] = Field(default=None) # Link to grid squares - grid_square: GridSquare | None = Relationship(back_populates="clem_image_series") - grid_square_id: int | None = Field(foreign_key="gridsquare.id", default=None) + grid_square: Optional["GridSquare"] = Relationship( + back_populates="clem_image_series" + ) + grid_square_id: Optional[int] = Field(foreign_key="gridsquare.id", default=None) # The parent LIF file this series originates from, if any - parent_lif: CLEMLIFFile | None = Relationship( + parent_lif: Optional["CLEMLIFFile"] = Relationship( back_populates="child_series", ) # Many to one - parent_lif_id: int | None = Field( + parent_lif_id: Optional[int] = Field( foreign_key="clemliffile.id", default=None, ) # The parent TIFF files used to build up the image stacks in the series, if any - parent_tiffs: list[CLEMTIFFFile] = Relationship( + parent_tiffs: List["CLEMTIFFFile"] = Relationship( back_populates="child_series", sa_relationship_kwargs={"cascade": "delete"} ) # One to many # Metadata file for this series - associated_metadata: CLEMImageMetadata | None = Relationship( + associated_metadata: Optional["CLEMImageMetadata"] = Relationship( back_populates="associated_series", ) # One to one - metadata_id: int | None = Field( + metadata_id: Optional[int] = Field( foreign_key="clemimagemetadata.id", default=None, ) # Image stack entries that are part of this series - child_stacks: list[CLEMImageStack] = Relationship( + child_stacks: List["CLEMImageStack"] = Relationship( back_populates="parent_series", sa_relationship_kwargs={"cascade": "delete"}, ) # One to many @@ -401,7 +405,7 @@ class DataCollectionGroup(SQLModel, table=True): # type: ignore back_populates="data_collection_group", sa_relationship_kwargs={"cascade": "delete"}, ) - clem_image_series: list["CLEMImageSeries"] = Relationship( + clem_image_series: List["CLEMImageSeries"] = Relationship( back_populates="data_collection_group", sa_relationship_kwargs={"cascade": "delete"}, ) @@ -607,7 +611,7 @@ class GridSquare(SQLModel, table=True): # type: ignore pixel_size: Optional[float] = None image: str = "" session: Optional[Session] = Relationship(back_populates="grid_squares") - clem_image_series: list["CLEMImageSeries"] = Relationship( + clem_image_series: List["CLEMImageSeries"] = Relationship( back_populates="grid_square", sa_relationship_kwargs={"cascade": "delete"} ) foil_holes: List["FoilHole"] = Relationship( From 1b007fa21d58740932da036fa0a571c99e725496 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 4 Nov 2025 13:00:10 +0000 Subject: [PATCH 09/21] Added more database columns to 'CLEMImageSeries' table to keep track of image shape, pixel size, and spatial location on grid --- src/murfey/util/db.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index 07a5ba1e..801ac5e4 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -292,9 +292,19 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore back_populates="parent_series", sa_relationship_kwargs={"cascade": "delete"}, ) # One to many - number_of_members: int = ( - 0 # Expected number of image stacks belonging to this series - ) + number_of_members: Optional[int] = Field(default=None) + + # Shape and resolution information + pixels_x: Optional[int] = Field(default=None) + pixels_y: Optional[int] = Field(default=None) + pixel_size: Optional[float] = Field(default=None) + units: Optional[str] = Field(default=None) + + # Extent of the imaged area in real space + x0: Optional[float] = Field(default=None) + x1: Optional[float] = Field(default=None) + y0: Optional[float] = Field(default=None) + y1: Optional[float] = Field(default=None) # Composite images composite_created: bool = False # Has a composite image been created? From 2387cddf84227ae3afcff7dee2392e2671d05e84 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 4 Nov 2025 16:22:03 +0000 Subject: [PATCH 10/21] Broke 'register_preprocessing_results' workflow down into smaller functions for the sake of readability --- .../clem/register_preprocessing_results.py | 441 ++++++++++-------- 1 file changed, 237 insertions(+), 204 deletions(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 7f6f1ad4..422d215b 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -50,6 +50,221 @@ class CLEMPreprocessingResult(BaseModel): extent: list[float] +def _register_results_in_murfey( + session_id: int, result: CLEMPreprocessingResult, murfey_db: Session +): + clem_img_series: MurfeyDB.CLEMImageSeries = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + clem_metadata: MurfeyDB.CLEMImageMetadata = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMImageMetadata, + session_id=session_id, + file_path=result.metadata, + ) + # Register and link parent LIF file if present + if result.parent_lif is not None: + clem_lif_file: MurfeyDB.CLEMLIFFile = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMLIFFile, + session_id=session_id, + file_path=result.parent_lif, + ) + clem_img_series.parent_lif = clem_lif_file + clem_metadata.parent_lif = clem_lif_file + + # Link and commit series and metadata tables first + clem_img_series.associated_metadata = clem_metadata + clem_img_series.number_of_members = result.number_of_members + murfey_db.add_all([clem_img_series, clem_metadata]) + murfey_db.commit() + + # Iteratively register the output image stacks + for c, (channel, output_file) in enumerate(result.output_files.items()): + clem_img_stk: MurfeyDB.CLEMImageStack = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMImageStack, + session_id=session_id, + file_path=output_file, + ) + + # Link associated metadata + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = channel + if result.parent_lif is not None: + clem_img_stk.parent_lif = clem_lif_file + murfey_db.add(clem_img_stk) + murfey_db.commit() + + # Register and link parent TIFF files if present + if result.parent_tiffs: + seed_file = result.parent_tiffs[channel][0] + if c == 0: + # Load list of files to register from seed file + series_identifier = seed_file.stem.split("--")[0] + "--" + tiff_list = list(seed_file.parent.glob(f"{series_identifier}--")) + + # Load TIFF files by colour channel if "--C" in file stem + match = re.search(r"--C[\d]{2,3}", seed_file.stem) + tiff_file_subset = [ + file + for file in tiff_list + if file.stem.startswith(series_identifier) + and (match.group(0) in file.stem if match else True) + ] + tiff_file_subset.sort() + + # Register TIFF file subset + clem_tiff_files = [] + for file in tiff_file_subset: + clem_tiff_file: MurfeyDB.CLEMTIFFFile = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMTIFFFile, + session_id=session_id, + file_path=file, + ) + + # Link associated metadata + clem_tiff_file.associated_metadata = clem_metadata + clem_tiff_file.child_series = clem_img_series + clem_tiff_file.child_stack = clem_img_stk + + clem_tiff_files.append(clem_tiff_file) + + murfey_db.add_all(clem_tiff_files) + murfey_db.commit() + + # Add data type and image search string + clem_img_series.search_string = str(output_file.parent / "*tiff") + clem_img_series.data_type = ( + "atlas" if "Overview_" in result.series_name else "grid_square" + ) + murfey_db.add(clem_img_series) + murfey_db.commit() + + logger.info(f"CLEM preprocessing results registered for {result.series_name!r} ") + + +def _register_results_in_ispyb( + session_id: int, + instrument_name: str, + visit_name: str, + result: CLEMPreprocessingResult, + murfey_db: Session, +): + # Determine variables to register data collection group and atlas with + proposal_code = "".join(char for char in visit_name.split("-")[0] if char.isalpha()) + proposal_number = "".join( + char for char in visit_name.split("-")[0] if char.isdigit() + ) + visit_number = visit_name.split("-")[-1] + + # Generate name/tag for data colleciton group based on series name + dcg_name = result.series_name.split("--")[0] + if result.series_name.split("--")[1].isdigit(): + dcg_name += f"--{result.series_name.split('--')[1]}" + + # Determine values for atlas + if "Overview_" in result.series_name: # These are atlas datasets + output_file = list(result.output_files.values())[0] + atlas_name = str(output_file.parent / "*.tiff") + atlas_pixel_size = result.pixel_size + else: + atlas_name = "" + atlas_pixel_size = 0.0 + + registration_result: dict[str, bool] + if dcg_search := murfey_db.exec( + select(MurfeyDB.DataCollectionGroup) + .where(MurfeyDB.DataCollectionGroup.session_id == session_id) + .where(MurfeyDB.DataCollectionGroup.tag == dcg_name) + ).all(): + # Update atlas if registering atlas dataset + # and data collection group already exists + dcg_entry = dcg_search[0] + if "Overview_" in result.series_name: + atlas_message = { + "session_id": session_id, + "dcgid": dcg_entry.id, + "atlas_id": dcg_entry.atlas_id, + "atlas": atlas_name, + "atlas_pixel_size": atlas_pixel_size, + "sample": dcg_entry.sample, + } + if entry_point_result := entry_points( + group="murfey.workflows", name="atlas_update" + ): + (workflow,) = entry_point_result + registration_result = workflow.load()( + message=atlas_message, + murfey_db=murfey_db, + ) + else: + logger.warning("No workflow found for 'atlas_update'") + registration_result = {"success": False, "requeue": False} + else: + registration_result = {"success": True} + else: + # Register data collection group and placeholder for the atlas + dcg_message = { + "microscope": instrument_name, + "proposal_code": proposal_code, + "proposal_number": proposal_number, + "visit_number": visit_number, + "session_id": session_id, + "tag": dcg_name, + "experiment_type": "experiment", + "experiment_type_id": None, + "atlas": atlas_name, + "atlas_pixel_size": atlas_pixel_size, + "sample": None, + } + if entry_point_result := entry_points( + group="murfey.workflows", name="data_collection_group" + ): + (workflow,) = entry_point_result + # Register grid square + registration_result = workflow.load()( + message=dcg_message, + murfey_db=murfey_db, + ) + else: + logger.warning("No workflow found for 'data_collection_group'") + registration_result = {"success": False, "requeue": False} + if registration_result.get("success", False): + logger.info( + "Successfully registered data collection group for CLEM workflow " + f"using{result.series_name!r}" + ) + else: + logger.warning( + "Failed to register data collection group for CLEM workflow " + f"using {result.series_name!r}" + ) + + # Store data collection group id in CLEM image series table + dcg_entry = murfey_db.exec( + select(MurfeyDB.DataCollectionGroup) + .where(MurfeyDB.DataCollectionGroup.session_id == session_id) + .where(MurfeyDB.DataCollectionGroup.tag == dcg_name) + ).one() + + clem_img_series: MurfeyDB.CLEMImageSeries = get_db_entry( + db=murfey_db, + table=MurfeyDB.CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + clem_img_series.dcg_id = dcg_entry.id + clem_img_series.dcg_name = dcg_entry.tag + murfey_db.add(clem_img_series) + murfey_db.commit() + + def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool]: session_id: int = ( int(message["session_id"]) @@ -76,105 +291,22 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool # Outer try-finally block for tidying up database-related section of function try: - # Register items in database if not already present try: - clem_img_series: MurfeyDB.CLEMImageSeries = get_db_entry( - db=murfey_db, - table=MurfeyDB.CLEMImageSeries, - session_id=session_id, - series_name=result.series_name, + # Load current session from database + murfey_session = murfey_db.exec( + select(MurfeyDB.Session).where(MurfeyDB.Session.id == session_id) + ).one() + except Exception: + logger.error( + "Exception encountered when loading Murfey session information: \n", + f"{traceback.format_exc()}", ) - clem_metadata: MurfeyDB.CLEMImageMetadata = get_db_entry( - db=murfey_db, - table=MurfeyDB.CLEMImageMetadata, + try: + # Register items in Murfey database + _register_results_in_murfey( session_id=session_id, - file_path=result.metadata, - ) - # Register and link parent LIF file if present - if result.parent_lif is not None: - clem_lif_file: MurfeyDB.CLEMLIFFile = get_db_entry( - db=murfey_db, - table=MurfeyDB.CLEMLIFFile, - session_id=session_id, - file_path=result.parent_lif, - ) - clem_img_series.parent_lif = clem_lif_file - clem_metadata.parent_lif = clem_lif_file - - # Link and commit series and metadata tables first - clem_img_series.associated_metadata = clem_metadata - clem_img_series.number_of_members = result.number_of_members - murfey_db.add_all([clem_img_series, clem_metadata]) - murfey_db.commit() - - # Iteratively register the output image stacks - for c, (channel, output_file) in enumerate(result.output_files.items()): - clem_img_stk: MurfeyDB.CLEMImageStack = get_db_entry( - db=murfey_db, - table=MurfeyDB.CLEMImageStack, - session_id=session_id, - file_path=output_file, - ) - - # Link associated metadata - clem_img_stk.associated_metadata = clem_metadata - clem_img_stk.parent_series = clem_img_series - clem_img_stk.channel_name = channel - if result.parent_lif is not None: - clem_img_stk.parent_lif = clem_lif_file - murfey_db.add(clem_img_stk) - murfey_db.commit() - - # Register and link parent TIFF files if present - if result.parent_tiffs: - seed_file = result.parent_tiffs[channel][0] - if c == 0: - # Load list of files to register from seed file - series_identifier = seed_file.stem.split("--")[0] + "--" - tiff_list = list( - seed_file.parent.glob(f"{series_identifier}--") - ) - - # Load TIFF files by colour channel if "--C" in file stem - match = re.search(r"--C[\d]{2,3}", seed_file.stem) - tiff_file_subset = [ - file - for file in tiff_list - if file.stem.startswith(series_identifier) - and (match.group(0) in file.stem if match else True) - ] - tiff_file_subset.sort() - - # Register TIFF file subset - clem_tiff_files = [] - for file in tiff_file_subset: - clem_tiff_file: MurfeyDB.CLEMTIFFFile = get_db_entry( - db=murfey_db, - table=MurfeyDB.CLEMTIFFFile, - session_id=session_id, - file_path=file, - ) - - # Link associated metadata - clem_tiff_file.associated_metadata = clem_metadata - clem_tiff_file.child_series = clem_img_series - clem_tiff_file.child_stack = clem_img_stk - - clem_tiff_files.append(clem_tiff_file) - - murfey_db.add_all(clem_tiff_files) - murfey_db.commit() - - # Add data type and image search string - clem_img_series.search_string = str(output_file.parent / "*tiff") - clem_img_series.data_type = ( - "atlas" if "Overview_" in result.series_name else "grid_square" - ) - murfey_db.add(clem_img_series) - murfey_db.commit() - - logger.info( - f"CLEM preprocessing results registered for {result.series_name!r} " + result=result, + murfey_db=murfey_db, ) except Exception: logger.error( @@ -183,115 +315,15 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool f"{traceback.format_exc()}" ) return {"success": False, "requeue": False} - try: - # Load current session from database - murfey_session = murfey_db.exec( - select(MurfeyDB.Session).where(MurfeyDB.Session.id == session_id) - ).one() - - # Determine variables to register data collection group and atlas with - visit_name = murfey_session.visit - proposal_code = "".join( - char for char in visit_name.split("-")[0] if char.isalpha() - ) - proposal_number = "".join( - char for char in visit_name.split("-")[0] if char.isdigit() + # Register items in ISPyB + _register_results_in_ispyb( + session_id=session_id, + instrument_name=murfey_session.instrument_name, + visit_name=murfey_session.visit, + result=result, + murfey_db=murfey_db, ) - visit_number = visit_name.split("-")[-1] - - # Generate name/tag for data colleciton group based on series name - dcg_name = result.series_name.split("--")[0] - if result.series_name.split("--")[1].isdigit(): - dcg_name += f"--{result.series_name.split('--')[1]}" - - # Determine values for atlas - if "Overview_" in result.series_name: # These are atlas datasets - atlas_name = str(output_file.parent / "*.tiff") - atlas_pixel_size = result.pixel_size - else: - atlas_name = "" - atlas_pixel_size = 0.0 - - registration_result: dict[str, bool] - if dcg_search := murfey_db.exec( - select(MurfeyDB.DataCollectionGroup) - .where(MurfeyDB.DataCollectionGroup.session_id == session_id) - .where(MurfeyDB.DataCollectionGroup.tag == dcg_name) - ).all(): - # Update atlas if registering atlas dataset - # and data collection group already exists - dcg_entry = dcg_search[0] - if "Overview_" in result.series_name: - atlas_message = { - "session_id": session_id, - "dcgid": dcg_entry.id, - "atlas_id": dcg_entry.atlas_id, - "atlas": atlas_name, - "atlas_pixel_size": atlas_pixel_size, - "sample": dcg_entry.sample, - } - if entry_point_result := entry_points( - group="murfey.workflows", name="atlas_update" - ): - (workflow,) = entry_point_result - registration_result = workflow.load()( - message=atlas_message, - murfey_db=murfey_db, - ) - else: - logger.warning("No workflow found for 'atlas_update'") - registration_result = {"success": False, "requeue": False} - else: - registration_result = {"success": True} - else: - # Register data collection group - dcg_message = { - "microscope": murfey_session.instrument_name, - "proposal_code": proposal_code, - "proposal_number": proposal_number, - "visit_number": visit_number, - "session_id": session_id, - "tag": dcg_name, - "experiment_type": "experiment", - "experiment_type_id": None, - "atlas": atlas_name, - "atlas_pixel_size": atlas_pixel_size, - "sample": None, - } - if entry_point_result := entry_points( - group="murfey.workflows", name="data_collection_group" - ): - (workflow,) = entry_point_result - # Register grid square - registration_result = workflow.load()( - message=dcg_message, - murfey_db=murfey_db, - ) - else: - logger.warning("No workflow found for 'data_collection_group'") - registration_result = {"success": False, "requeue": False} - if registration_result.get("success", False): - logger.info( - "Successfully registered data collection group for CLEM workflow " - f"using{result.series_name!r}" - ) - else: - logger.warning( - "Failed to register data collection group for CLEM workflow " - f"using {result.series_name!r}" - ) - - # Store data collection group id in CLEM image series table - dcg_entry = murfey_db.exec( - select(MurfeyDB.DataCollectionGroup) - .where(MurfeyDB.DataCollectionGroup.session_id == session_id) - .where(MurfeyDB.DataCollectionGroup.tag == dcg_name) - ).one() - clem_img_series.dcg_id = dcg_entry.id - clem_img_series.dcg_name = dcg_entry.tag - murfey_db.add(clem_img_series) - murfey_db.commit() except Exception: logger.error( "Exception encountered when registering data collection group for CLEM workflow " @@ -303,8 +335,8 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool image_combos_to_process = [ list(result.output_files.values()) # Composite image of all channels ] - # Create additional fluorescent-only and bright field-only jobs if ("gray" in result.output_files.keys()) and len(result.output_files) > 1: + # Create additional fluorescent-only composite image image_combos_to_process.append( [ file @@ -312,6 +344,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool if channel != "gray" ] ) + # Create additional bright field-only image image_combos_to_process.append( [ file From 836e0164915e323fbde2e0a7aeab281845dd2f3a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 4 Nov 2025 16:45:14 +0000 Subject: [PATCH 11/21] Added new optional fields into 'GridSquareParameters' table to stored scaled down information about the grid squares being registered --- src/murfey/util/models.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index 64f9cd4f..c1ae42a5 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -128,7 +128,9 @@ class Base(BaseModel): class GridSquareParameters(BaseModel): tag: str x_location: Optional[float] = None + x_location_scaled: Optional[int] = None y_location: Optional[float] = None + y_location_scaled: Optional[int] = None x_stage_position: Optional[float] = None y_stage_position: Optional[float] = None readout_area_x: Optional[int] = None @@ -136,7 +138,9 @@ class GridSquareParameters(BaseModel): thumbnail_size_x: Optional[int] = None thumbnail_size_y: Optional[int] = None height: Optional[int] = None + height_scaled: Optional[int] = None width: Optional[int] = None + width_scaled: Optional[int] = None pixel_size: Optional[float] = None image: str = "" angle: Optional[float] = None From 0d88293cdddbc620b69a6a61f8db3ae3073f487d Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 4 Nov 2025 16:48:36 +0000 Subject: [PATCH 12/21] Migrated logic for rescaling grid square values into the 'flush_spa_preprocess' workflow instead, so that the grid square registration and updating functions just register the values as-is --- src/murfey/server/ispyb.py | 50 ++++++------------- .../workflows/spa/flush_spa_preprocess.py | 10 ++++ 2 files changed, 25 insertions(+), 35 deletions(-) diff --git a/src/murfey/server/ispyb.py b/src/murfey/server/ispyb.py index 163db99f..0085f81b 100644 --- a/src/murfey/server/ispyb.py +++ b/src/murfey/server/ispyb.py @@ -194,34 +194,14 @@ def do_insert_grid_square( grid_square_parameters.readout_area_x / grid_square_parameters.thumbnail_size_x ) - grid_square_parameters.height = ( - int(grid_square_parameters.height / 7.8) - if grid_square_parameters.height - else None - ) - grid_square_parameters.width = ( - int(grid_square_parameters.width / 7.8) - if grid_square_parameters.width - else None - ) - grid_square_parameters.x_location = ( - int(grid_square_parameters.x_location / 7.8) - if grid_square_parameters.x_location - else None - ) - grid_square_parameters.y_location = ( - int(grid_square_parameters.y_location / 7.8) - if grid_square_parameters.y_location - else None - ) record = GridSquare( atlasId=atlas_id, gridSquareLabel=grid_square_id, gridSquareImage=grid_square_parameters.image, - pixelLocationX=grid_square_parameters.x_location, - pixelLocationY=grid_square_parameters.y_location, - height=grid_square_parameters.height, - width=grid_square_parameters.width, + pixelLocationX=grid_square_parameters.x_location_scaled, + pixelLocationY=grid_square_parameters.y_location_scaled, + height=grid_square_parameters.height_scaled, + width=grid_square_parameters.width_scaled, angle=grid_square_parameters.angle, stageLocationX=grid_square_parameters.x_stage_position, stageLocationY=grid_square_parameters.y_stage_position, @@ -246,7 +226,7 @@ def do_update_grid_square( ): try: with ISPyBSession() as db: - grid_square = ( + grid_square: GridSquare = ( db.query(GridSquare) .filter(GridSquare.gridSquareId == grid_square_id) .one() @@ -262,18 +242,18 @@ def do_update_grid_square( ) if grid_square_parameters.image: grid_square.gridSquareImage = grid_square_parameters.image - if grid_square_parameters.x_location: - grid_square.pixelLocationX = int( - grid_square_parameters.x_location / 7.8 + if grid_square_parameters.x_location_scaled: + grid_square.pixelLocationX = ( + grid_square_parameters.x_location_scaled ) - if grid_square_parameters.y_location: - grid_square.pixelLocationY = int( - grid_square_parameters.y_location / 7.8 + if grid_square_parameters.y_location_scaled: + grid_square.pixelLocationY = ( + grid_square_parameters.y_location_scaled ) - if grid_square_parameters.height is not None: - grid_square.height = int(grid_square_parameters.height / 7.8) - if grid_square_parameters.width is not None: - grid_square.width = int(grid_square_parameters.width / 7.8) + if grid_square_parameters.height_scaled is not None: + grid_square.height = grid_square_parameters.height_scaled + if grid_square_parameters.width_scaled is not None: + grid_square.width = grid_square_parameters.width_scaled if grid_square_parameters.angle: grid_square.angle = grid_square_parameters.angle if grid_square_parameters.x_stage_position: diff --git a/src/murfey/workflows/spa/flush_spa_preprocess.py b/src/murfey/workflows/spa/flush_spa_preprocess.py index ffbb6eb3..9f067f68 100644 --- a/src/murfey/workflows/spa/flush_spa_preprocess.py +++ b/src/murfey/workflows/spa/flush_spa_preprocess.py @@ -43,6 +43,16 @@ def register_grid_square( grid_square_params: GridSquareParameters, murfey_db: Session, ): + # Calculate scaled down version of the image for registration to ISPyB first + if grid_square_params.x_location is not None: + grid_square_params.x_location_scaled = int(grid_square_params.x_location / 7.8) + if grid_square_params.y_location is not None: + grid_square_params.y_location_scaled = int(grid_square_params.y_location / 7.8) + if grid_square_params.height is not None: + grid_square_params.height_scaled = int(grid_square_params.height / 7.8) + if grid_square_params.width is not None: + grid_square_params.width_scaled = int(grid_square_params.width / 7.8) + try: grid_square = murfey_db.exec( select(GridSquare) From d7b33f39ad358356ad8f8905f97c2df8c328f3a3 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 4 Nov 2025 16:55:01 +0000 Subject: [PATCH 13/21] Return failure result if unable to load Murfey session information --- src/murfey/workflows/clem/register_preprocessing_results.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 422d215b..ed40ac2d 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -301,6 +301,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool "Exception encountered when loading Murfey session information: \n", f"{traceback.format_exc()}", ) + return {"success": False, "requeue": False} try: # Register items in Murfey database _register_results_in_murfey( From 41f533a6e56f757a20be373205e73bbda0a7df3b Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 5 Nov 2025 09:45:05 +0000 Subject: [PATCH 14/21] Added logic to register non-atlas CLEM image series as grid squares --- .../clem/register_preprocessing_results.py | 225 +++++++++++++++--- 1 file changed, 197 insertions(+), 28 deletions(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index ed40ac2d..9689fc4d 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -20,6 +20,7 @@ import murfey.util.db as MurfeyDB from murfey.server import _transport_object +from murfey.util.models import GridSquareParameters from murfey.util.processing_params import ( default_clem_align_and_merge_parameters as processing_params, ) @@ -47,11 +48,13 @@ class CLEMPreprocessingResult(BaseModel): units: str pixel_size: float resolution: float - extent: list[float] + extent: list[float] # [x0, x1, y0, y1] -def _register_results_in_murfey( - session_id: int, result: CLEMPreprocessingResult, murfey_db: Session +def _register_clem_image_series( + session_id: int, + result: CLEMPreprocessingResult, + murfey_db: Session, ): clem_img_series: MurfeyDB.CLEMImageSeries = get_db_entry( db=murfey_db, @@ -76,9 +79,8 @@ def _register_results_in_murfey( clem_img_series.parent_lif = clem_lif_file clem_metadata.parent_lif = clem_lif_file - # Link and commit series and metadata tables first + # Link and commit series and metadata tables clem_img_series.associated_metadata = clem_metadata - clem_img_series.number_of_members = result.number_of_members murfey_db.add_all([clem_img_series, clem_metadata]) murfey_db.commit() @@ -138,18 +140,28 @@ def _register_results_in_murfey( murfey_db.add_all(clem_tiff_files) murfey_db.commit() - # Add data type and image search string + # Add metadata for this series clem_img_series.search_string = str(output_file.parent / "*tiff") clem_img_series.data_type = ( "atlas" if "Overview_" in result.series_name else "grid_square" ) + clem_img_series.number_of_members = result.number_of_members + clem_img_series.pixels_x = result.pixels_x + clem_img_series.pixels_y = result.pixels_y + clem_img_series.pixel_size = result.pixel_size + clem_img_series.units = result.units + clem_img_series.x0 = result.extent[0] + clem_img_series.x1 = result.extent[1] + clem_img_series.y0 = result.extent[2] + clem_img_series.y1 = result.extent[3] murfey_db.add(clem_img_series) murfey_db.commit() + murfey_db.close() logger.info(f"CLEM preprocessing results registered for {result.series_name!r} ") -def _register_results_in_ispyb( +def _register_dcg_and_atlas( session_id: int, instrument_name: str, visit_name: str, @@ -177,15 +189,14 @@ def _register_results_in_ispyb( atlas_name = "" atlas_pixel_size = 0.0 - registration_result: dict[str, bool] if dcg_search := murfey_db.exec( select(MurfeyDB.DataCollectionGroup) .where(MurfeyDB.DataCollectionGroup.session_id == session_id) .where(MurfeyDB.DataCollectionGroup.tag == dcg_name) ).all(): + dcg_entry = dcg_search[0] # Update atlas if registering atlas dataset # and data collection group already exists - dcg_entry = dcg_search[0] if "Overview_" in result.series_name: atlas_message = { "session_id": session_id, @@ -199,15 +210,12 @@ def _register_results_in_ispyb( group="murfey.workflows", name="atlas_update" ): (workflow,) = entry_point_result - registration_result = workflow.load()( + _ = workflow.load()( message=atlas_message, murfey_db=murfey_db, ) else: logger.warning("No workflow found for 'atlas_update'") - registration_result = {"success": False, "requeue": False} - else: - registration_result = {"success": True} else: # Register data collection group and placeholder for the atlas dcg_message = { @@ -228,23 +236,12 @@ def _register_results_in_ispyb( ): (workflow,) = entry_point_result # Register grid square - registration_result = workflow.load()( + _ = workflow.load()( message=dcg_message, murfey_db=murfey_db, ) else: logger.warning("No workflow found for 'data_collection_group'") - registration_result = {"success": False, "requeue": False} - if registration_result.get("success", False): - logger.info( - "Successfully registered data collection group for CLEM workflow " - f"using{result.series_name!r}" - ) - else: - logger.warning( - "Failed to register data collection group for CLEM workflow " - f"using {result.series_name!r}" - ) # Store data collection group id in CLEM image series table dcg_entry = murfey_db.exec( @@ -263,6 +260,162 @@ def _register_results_in_ispyb( clem_img_series.dcg_name = dcg_entry.tag murfey_db.add(clem_img_series) murfey_db.commit() + murfey_db.close() + + +def _register_grid_square( + session_id: int, + result: CLEMPreprocessingResult, + murfey_db: Session, +): + # Skip this step if no transport manager object is configured + if _transport_object is None: + logger.error("Unable to find transport manager") + return + # Load all entries for the current data collection group + dcg_name = result.series_name.split("--")[0] + if result.series_name.split("--")[1].isdigit(): + dcg_name += f"--{result.series_name.split('--')[1]}" + + # Check if an atlas has been registered + if atlas_search := murfey_db.exec( + select(MurfeyDB.CLEMImageSeries) + .where(MurfeyDB.CLEMImageSeries.session_id == session_id) + .where(MurfeyDB.CLEMImageSeries.dcg_name == dcg_name) + .where(MurfeyDB.CLEMImageSeries.data_type == "atlas") + ).all(): + atlas_entry = atlas_search[0] + else: + logger.info( + f"No atlas has been registered for data collection group {dcg_name!r} yet" + ) + return + + # Check if there are CLEM entries to register + if clem_img_series_to_register := murfey_db.exec( + select(MurfeyDB.CLEMImageSeries) + .where(MurfeyDB.CLEMImageSeries.session_id == session_id) + .where(MurfeyDB.CLEMImageSeries.dcg_name == dcg_name) + .where(MurfeyDB.CLEMImageSeries.data_type == "grid_square") + ): + if ( + atlas_entry.x0 is not None + and atlas_entry.x1 is not None + and atlas_entry.y0 is not None + and atlas_entry.y1 is not None + and atlas_entry.pixels_x is not None + and atlas_entry.pixels_y is not None + ): + atlas_width_real = atlas_entry.x1 - atlas_entry.x0 + atlas_height_real = atlas_entry.y1 - atlas_entry.y0 + else: + logger.warning("Atlas entry not populated with required values") + return + + for clem_img_series in clem_img_series_to_register: + if ( + clem_img_series.x0 is not None + and clem_img_series.x1 is not None + and clem_img_series.y0 is not None + and clem_img_series.y1 is not None + ): + # Find pixel corresponding to image midpoint on atlas + x_mid_real = ( + 0.5 * (clem_img_series.x0 + clem_img_series.x1) - atlas_entry.x0 + ) + x_mid_px = int(x_mid_real / atlas_width_real * atlas_entry.pixels_x) + y_mid_real = ( + 0.5 * (clem_img_series.y0 + clem_img_series.y1) - atlas_entry.y0 + ) + y_mid_px = int(y_mid_real / atlas_height_real * atlas_entry.pixels_y) + else: + logger.warning( + f"Image series {clem_img_series.series_name!r} not populated with required values" + ) + continue + + # Populate grid square Pydantic model + grid_square_params = GridSquareParameters( + tag=dcg_name, + x_location=clem_img_series.x0, + x_location_scaled=x_mid_px, + y_location=clem_img_series.y0, + y_location_scaled=y_mid_px, + height=clem_img_series.pixels_x, + width=clem_img_series.pixels_y, + x_stage_position=clem_img_series.x0, + y_stage_position=clem_img_series.y0, + pixel_size=clem_img_series.pixel_size, + image=clem_img_series.search_string, + ) + # Register or update the grid square entry as required + if grid_square_result := murfey_db.exec( + select(MurfeyDB.GridSquare) + .where(MurfeyDB.GridSquare.name == clem_img_series.id) + .where(MurfeyDB.GridSquare.tag == grid_square_params.tag) + .where(MurfeyDB.GridSquare.session_id == session_id) + ).all(): + # Update existing grid square entry on Murfey + grid_square_entry = grid_square_result[0] + grid_square_entry.x_location = grid_square_params.x_location + grid_square_entry.y_location = grid_square_params.y_location + grid_square_entry.x_stage_position = grid_square_params.x_stage_position + grid_square_entry.y_stage_position = grid_square_params.y_stage_position + grid_square_entry.readout_area_x = grid_square_params.readout_area_x + grid_square_entry.readout_area_y = grid_square_params.readout_area_y + grid_square_entry.thumbnail_size_x = grid_square_params.thumbnail_size_x + grid_square_entry.thumbnail_size_y = grid_square_params.thumbnail_size_y + grid_square_entry.pixel_size = grid_square_params.pixel_size + grid_square_entry.image = grid_square_params.image + + # Update existing entry on ISPyB + _transport_object.do_update_grid_square( + grid_square_id=grid_square_entry.id, + grid_square_parameters=grid_square_params, + ) + else: + # Look up data collection group for current series + dcg_entry = murfey_db.exec( + select(MurfeyDB.DataCollectionGroup) + .where(MurfeyDB.DataCollectionGroup.session_id == session_id) + .where(MurfeyDB.DataCollectionGroup.tag == grid_square_params.tag) + ).one() + # Register to ISPyB + grid_square_ispyb_result = _transport_object.do_insert_grid_square( + atlas_id=dcg_entry.atlas_id, + grid_square_id=clem_img_series.id, + grid_square_parameters=grid_square_params, + ) + # Register to Murfey + grid_square_entry = MurfeyDB.GridSquare( + id=grid_square_ispyb_result.get("return_value", None), + name=clem_img_series.id, + session_id=session_id, + tag=grid_square_params.tag, + x_location=grid_square_params.x_location, + y_location=grid_square_params.y_location, + x_stage_position=grid_square_params.x_stage_position, + y_stage_position=grid_square_params.y_stage_position, + readout_area_x=grid_square_params.readout_area_x, + readout_area_y=grid_square_params.readout_area_y, + thumbnail_size_x=grid_square_params.thumbnail_size_x, + thumbnail_size_y=grid_square_params.thumbnail_size_y, + pixel_size=grid_square_params.pixel_size, + image=grid_square_params.image, + ) + murfey_db.add(grid_square_entry) + murfey_db.commit() + + # Add grid square ID to existing CLEM image series entry + clem_img_series.grid_square_id = grid_square_entry.id + murfey_db.add(clem_img_series) + murfey_db.commit() + else: + logger.info( + f"No grid squares to register for data collection group {dcg_name!r} yet" + ) + murfey_db.close() + return def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool]: @@ -304,7 +457,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool return {"success": False, "requeue": False} try: # Register items in Murfey database - _register_results_in_murfey( + _register_clem_image_series( session_id=session_id, result=result, murfey_db=murfey_db, @@ -317,8 +470,8 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool ) return {"success": False, "requeue": False} try: - # Register items in ISPyB - _register_results_in_ispyb( + # Register data collection group and atlas in ISPyB + _register_dcg_and_atlas( session_id=session_id, instrument_name=murfey_session.instrument_name, visit_name=murfey_session.visit, @@ -326,12 +479,28 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool murfey_db=murfey_db, ) except Exception: + # Log error but allow workflow to proceed logger.error( "Exception encountered when registering data collection group for CLEM workflow " f"using {result.series_name!r}: \n" f"{traceback.format_exc()}" ) + try: + # Register dataset as grid square + if "Overview_" not in result.series_name: + _register_grid_square( + session_id=session_id, + result=result, + murfey_db=murfey_db, + ) + except Exception: + # Log error but allow workflow to proceed + logger.error( + f"Exception encountered when registering grid square for {result.series_name}: \n" + f"{traceback.format_exc()}" + ) + # Construct list of files to use for image alignment and merging steps image_combos_to_process = [ list(result.output_files.values()) # Composite image of all channels From 1cfb0066ece851d874637b288309b997b9899306 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 5 Nov 2025 10:34:03 +0000 Subject: [PATCH 15/21] Included calculation of image dimensions (in pixels) as displayed on atlas --- .../clem/register_preprocessing_results.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 9689fc4d..2804a108 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -328,6 +328,18 @@ def _register_grid_square( 0.5 * (clem_img_series.y0 + clem_img_series.y1) - atlas_entry.y0 ) y_mid_px = int(y_mid_real / atlas_height_real * atlas_entry.pixels_y) + + # Find the number of pixels in width and height the image corresponds to on the atlas + width_scaled = int( + (clem_img_series.x1 - clem_img_series.x0) + / atlas_width_real + * atlas_entry.pixels_x + ) + height_scaled = int( + (clem_img_series.y1 - clem_img_series.y0) + / atlas_height_real + * atlas_entry.pixels_y + ) else: logger.warning( f"Image series {clem_img_series.series_name!r} not populated with required values" @@ -342,7 +354,9 @@ def _register_grid_square( y_location=clem_img_series.y0, y_location_scaled=y_mid_px, height=clem_img_series.pixels_x, + height_scaled=height_scaled, width=clem_img_series.pixels_y, + width_scaled=width_scaled, x_stage_position=clem_img_series.x0, y_stage_position=clem_img_series.y0, pixel_size=clem_img_series.pixel_size, From 7967068aa9cf90459043b4071a624ec839588d3b Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 5 Nov 2025 12:00:09 +0000 Subject: [PATCH 16/21] Added placeholders for the unit tests to register CLEM preprocessing results --- .../test_register_preprocessing_results.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 tests/workflows/clem/test_register_preprocessing_results.py diff --git a/tests/workflows/clem/test_register_preprocessing_results.py b/tests/workflows/clem/test_register_preprocessing_results.py new file mode 100644 index 00000000..19c755bc --- /dev/null +++ b/tests/workflows/clem/test_register_preprocessing_results.py @@ -0,0 +1,28 @@ +import pytest + +from murfey.workflows.clem.register_preprocessing_results import ( + _register_clem_image_series, + _register_dcg_and_atlas, + _register_grid_square, + run, +) + + +@pytest.mark.skip +def test_register_clem_image_series(): + assert _register_clem_image_series + + +@pytest.mark.skip +def test_register_dcg_and_atlas(): + assert _register_dcg_and_atlas + + +@pytest.mark.skip +def test_register_grid_square(): + assert _register_grid_square + + +@pytest.mark.skip +def test_run(): + assert run From 4038cda8176b0913fd53d15ff678277541008c30 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 5 Nov 2025 15:57:48 +0000 Subject: [PATCH 17/21] Added unit test for 'run' main function in 'register_preprocessing_results' --- .../test_register_preprocessing_results.py | 126 +++++++++++++++++- 1 file changed, 124 insertions(+), 2 deletions(-) diff --git a/tests/workflows/clem/test_register_preprocessing_results.py b/tests/workflows/clem/test_register_preprocessing_results.py index 19c755bc..a92d7c1f 100644 --- a/tests/workflows/clem/test_register_preprocessing_results.py +++ b/tests/workflows/clem/test_register_preprocessing_results.py @@ -1,4 +1,9 @@ +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock + import pytest +from pytest_mock import MockerFixture from murfey.workflows.clem.register_preprocessing_results import ( _register_clem_image_series, @@ -6,6 +11,86 @@ _register_grid_square, run, ) +from tests.conftest import ExampleVisit + +visit_name = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}-{ExampleVisit.visit_number}" +processed_dir_name = "processed" +grid_name = "Grid_1" +colors = ("gray", "green", "red") + + +@pytest.fixture +def preprocessing_messages(tmp_path: Path): + # Make directory to where data for current grid is stored + visit_dir = tmp_path / "data" / "2020" / visit_name + processed_dir = visit_dir / processed_dir_name + grid_dir = processed_dir / grid_name + grid_dir.mkdir(parents=True, exist_ok=True) + + # Construct all the datasets to be tested + datasets: list[tuple[Path, bool, bool, tuple[int, int], float, list[float]]] = [ + ( + grid_dir / "Overview_1" / "Image_1", + False, + True, + (2400, 2400), + 1e-6, + [0.002, 0.0044, 0.002, 0.0044], + ) + ] + # Add on metadata for a few grid squares + datasets.extend( + [ + ( + grid_dir / "TileScan_1" / f"Position_{n}", + True, + False, + (2048, 2048), + 1.6e-7, + [0.003, 0.00332768, 0.003, 0.00332768], + ) + for n in range(5) + ] + ) + + messages: list[dict[str, Any]] = [] + for dataset in datasets: + # Unpack items from list of dataset parameters + series_path = dataset[0] + series_name = str(series_path.relative_to(processed_dir)).replace("/", "--") + metadata = series_path / "metadata" / f"{series_path.stem}.xml" + metadata.parent.mkdir(parents=True, exist_ok=True) + metadata.touch(exist_ok=True) + output_files = {color: str(series_path / f"{color}.tiff") for color in colors} + for output_file in output_files.values(): + Path(output_file).touch(exist_ok=True) + is_stack = dataset[1] + is_montage = dataset[2] + shape = dataset[3] + pixel_size = dataset[4] + extent = dataset[5] + + message = { + "session_id": ExampleVisit.murfey_session_id, + "result": { + "series_name": series_name, + "number_of_members": 3, + "is_stack": is_stack, + "is_montage": is_montage, + "output_files": output_files, + "metadata": str(metadata), + "parent_lif": None, + "parent_tiffs": {}, + "pixels_x": shape[0], + "pixels_y": shape[1], + "units": "m", + "pixel_size": pixel_size, + "resolution": 1 / pixel_size, + "extent": extent, + }, + } + messages.append(message) + return messages @pytest.mark.skip @@ -23,6 +108,43 @@ def test_register_grid_square(): assert _register_grid_square -@pytest.mark.skip -def test_run(): +def test_run( + mocker: MockerFixture, + preprocessing_messages: list[dict[str, Any]], +): + # Mock the MurfeyDB connection + mock_murfey_session_entry = MagicMock() + mock_murfey_session_entry.instrument_name = ExampleVisit.instrument_name + mock_murfey_session_entry.visit = visit_name + mock_murfey_db = MagicMock() + mock_murfey_db.exec().return_value.one.return_value = mock_murfey_session_entry + + # Mock the registration helper functions + mock_register_clem_series = mocker.patch( + "murfey.workflows.clem.register_preprocessing_results._register_clem_image_series" + ) + mock_register_dcg_and_atlas = mocker.patch( + "murfey.workflows.clem.register_preprocessing_results._register_dcg_and_atlas" + ) + mock_register_grid_square = mocker.patch( + "murfey.workflows.clem.register_preprocessing_results._register_grid_square" + ) + + # Mock the align and merge workflow call + mock_align_and_merge_call = mocker.patch( + "murfey.workflows.clem.register_preprocessing_results.submit_cluster_request" + ) + + for message in preprocessing_messages: + result = run( + message=message, + murfey_db=mock_murfey_db, + ) + assert result == {"success": True} + assert mock_register_clem_series.call_count == len(preprocessing_messages) + assert mock_register_dcg_and_atlas.call_count == len(preprocessing_messages) + assert mock_register_grid_square.call_count == len(preprocessing_messages) - 1 + assert mock_align_and_merge_call.call_count == len(preprocessing_messages) * len( + colors + ) assert run From 72b2b1fbfbbc6c8c20666fb6b33743562990633f Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 6 Nov 2025 14:36:18 +0000 Subject: [PATCH 18/21] Test workflow with actual Murfey and ISPyB databases --- .../test_register_preprocessing_results.py | 108 +++++++++++++++++- 1 file changed, 102 insertions(+), 6 deletions(-) diff --git a/tests/workflows/clem/test_register_preprocessing_results.py b/tests/workflows/clem/test_register_preprocessing_results.py index a92d7c1f..acaead3b 100644 --- a/tests/workflows/clem/test_register_preprocessing_results.py +++ b/tests/workflows/clem/test_register_preprocessing_results.py @@ -4,14 +4,17 @@ import pytest from pytest_mock import MockerFixture +from sqlalchemy.orm.session import Session as SQLAlchemySession +from sqlmodel.orm.session import Session as SQLModelSession +import murfey.util.db as MurfeyDB from murfey.workflows.clem.register_preprocessing_results import ( _register_clem_image_series, _register_dcg_and_atlas, _register_grid_square, run, ) -from tests.conftest import ExampleVisit +from tests.conftest import ExampleVisit, get_or_create_db_entry visit_name = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}-{ExampleVisit.visit_number}" processed_dir_name = "processed" @@ -20,9 +23,16 @@ @pytest.fixture -def preprocessing_messages(tmp_path: Path): +def rsync_basepath(tmp_path: Path): + return tmp_path / "data" + + +def generate_preprocessing_messages( + rsync_basepath: Path, + session_id: int, +): # Make directory to where data for current grid is stored - visit_dir = tmp_path / "data" / "2020" / visit_name + visit_dir = rsync_basepath / "2020" / visit_name processed_dir = visit_dir / processed_dir_name grid_dir = processed_dir / grid_name grid_dir.mkdir(parents=True, exist_ok=True) @@ -71,7 +81,7 @@ def preprocessing_messages(tmp_path: Path): extent = dataset[5] message = { - "session_id": ExampleVisit.murfey_session_id, + "session_id": session_id, "result": { "series_name": series_name, "number_of_members": 3, @@ -110,7 +120,7 @@ def test_register_grid_square(): def test_run( mocker: MockerFixture, - preprocessing_messages: list[dict[str, Any]], + rsync_basepath: Path, ): # Mock the MurfeyDB connection mock_murfey_session_entry = MagicMock() @@ -135,6 +145,10 @@ def test_run( "murfey.workflows.clem.register_preprocessing_results.submit_cluster_request" ) + preprocessing_messages = generate_preprocessing_messages( + rsync_basepath=rsync_basepath, + session_id=ExampleVisit.murfey_session_id, + ) for message in preprocessing_messages: result = run( message=message, @@ -147,4 +161,86 @@ def test_run( assert mock_align_and_merge_call.call_count == len(preprocessing_messages) * len( colors ) - assert run + + +def test_run_with_db( + mocker: MockerFixture, + rsync_basepath: Path, + mock_ispyb_credentials, + murfey_db_session: SQLModelSession, + ispyb_db_session: SQLAlchemySession, +): + # Create a session to insert for this test + murfey_session: MurfeyDB.Session = get_or_create_db_entry( + murfey_db_session, + MurfeyDB.Session, + lookup_kwargs={ + "id": ExampleVisit.murfey_session_id + 1, + "name": visit_name, + "visit": visit_name, + "instrument_name": ExampleVisit.instrument_name, + }, + ) + + # Mock the ISPyB connection where the TransportManager class is located + mock_security_config = MagicMock() + mock_security_config.ispyb_credentials = mock_ispyb_credentials + mocker.patch( + "murfey.server.ispyb.get_security_config", + return_value=mock_security_config, + ) + mocker.patch( + "murfey.server.ispyb.ISPyBSession", + return_value=ispyb_db_session, + ) + + # Mock the ISPYB connection when registering data collection group + mocker.patch( + "murfey.workflows.register_data_collection_group.ISPyBSession", + return_value=ispyb_db_session, + ) + + # Mock out the machine config used in the helper sanitisation function + mock_get_machine_config = mocker.patch("murfey.workflows.clem.get_machine_config") + mock_machine_config = MagicMock() + mock_machine_config.rsync_basepath = rsync_basepath + mock_get_machine_config.return_value = { + ExampleVisit.instrument_name: mock_machine_config, + } + + # Mock the align and merge workflow call + mock_align_and_merge_call = mocker.patch( + "murfey.workflows.clem.register_preprocessing_results.submit_cluster_request" + ) + + # Patch the TransportManager object in the workflows called + from murfey.server.ispyb import TransportManager + + mocker.patch( + "murfey.workflows.clem.register_preprocessing_results._transport_object", + new=TransportManager("PikaTransport"), + ) + mocker.patch( + "murfey.workflows.register_data_collection_group._transport_object", + new=TransportManager("PikaTransport"), + ) + mocker.patch( + "murfey.workflows.register_atlas_update._transport_object", + new=TransportManager("PikaTransport"), + ) + + # Run the function + preprocessing_messages = generate_preprocessing_messages( + rsync_basepath=rsync_basepath, + session_id=murfey_session.id, + ) + for message in preprocessing_messages: + result = run( + message=message, + murfey_db=murfey_db_session, + ) + assert result == {"success": True} + assert mock_align_and_merge_call.call_count == len(preprocessing_messages) * len( + colors + ) + murfey_db_session.close() From df1f7320ddfe4ba6a573a3d4a79eb905c5d101e3 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 6 Nov 2025 14:52:00 +0000 Subject: [PATCH 19/21] Pass messages to test in reverse order as well --- .../clem/test_register_preprocessing_results.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/workflows/clem/test_register_preprocessing_results.py b/tests/workflows/clem/test_register_preprocessing_results.py index acaead3b..958ad69b 100644 --- a/tests/workflows/clem/test_register_preprocessing_results.py +++ b/tests/workflows/clem/test_register_preprocessing_results.py @@ -163,13 +163,24 @@ def test_run( ) +test_matrix = ( + # Reverse order of list + (False,), + (True,), +) + + +@pytest.mark.parametrize("test_params", test_matrix) def test_run_with_db( mocker: MockerFixture, rsync_basepath: Path, mock_ispyb_credentials, murfey_db_session: SQLModelSession, ispyb_db_session: SQLAlchemySession, + test_params: tuple[bool], ): + (shuffle_message,) = test_params + # Create a session to insert for this test murfey_session: MurfeyDB.Session = get_or_create_db_entry( murfey_db_session, @@ -234,12 +245,16 @@ def test_run_with_db( rsync_basepath=rsync_basepath, session_id=murfey_session.id, ) + if shuffle_message: + preprocessing_messages.reverse() for message in preprocessing_messages: result = run( message=message, murfey_db=murfey_db_session, ) assert result == {"success": True} + # Each message should call the align-and-merge workflow thrice + # if gray and colour channels are both present assert mock_align_and_merge_call.call_count == len(preprocessing_messages) * len( colors ) From 406d510804b11b01c51e59648c47ee820d88f58c Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 6 Nov 2025 15:26:23 +0000 Subject: [PATCH 20/21] More thorough test to verify that the database insertions on both Murfey and ISPyB have happened --- .../test_register_preprocessing_results.py | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/tests/workflows/clem/test_register_preprocessing_results.py b/tests/workflows/clem/test_register_preprocessing_results.py index 958ad69b..f617496f 100644 --- a/tests/workflows/clem/test_register_preprocessing_results.py +++ b/tests/workflows/clem/test_register_preprocessing_results.py @@ -2,9 +2,12 @@ from typing import Any from unittest.mock import MagicMock +import ispyb.sqlalchemy as ISPyBDB import pytest from pytest_mock import MockerFixture +from sqlalchemy import select as sa_select from sqlalchemy.orm.session import Session as SQLAlchemySession +from sqlmodel import select as sm_select from sqlmodel.orm.session import Session as SQLModelSession import murfey.util.db as MurfeyDB @@ -179,6 +182,7 @@ def test_run_with_db( ispyb_db_session: SQLAlchemySession, test_params: tuple[bool], ): + # Unpack test params (shuffle_message,) = test_params # Create a session to insert for this test @@ -253,9 +257,63 @@ def test_run_with_db( murfey_db=murfey_db_session, ) assert result == {"success": True} + # Each message should call the align-and-merge workflow thrice # if gray and colour channels are both present assert mock_align_and_merge_call.call_count == len(preprocessing_messages) * len( colors ) + + # Both databases should have entries for data collection group, and grid squares + # ISPyB database should additionally have an atlas entry + murfey_dcg_search = murfey_db_session.exec( + sm_select(MurfeyDB.DataCollectionGroup).where( + MurfeyDB.DataCollectionGroup.session_id == murfey_session.id + ) + ).all() + assert len(murfey_dcg_search) == 1 + murfey_gs_search = murfey_db_session.exec( + sm_select(MurfeyDB.GridSquare).where( + MurfeyDB.GridSquare.session_id == murfey_session.id + ) + ).all() + assert len(murfey_gs_search) == len(preprocessing_messages) - 1 + + murfey_dcg = murfey_dcg_search[0] + ispyb_dcg_search = ( + ispyb_db_session.execute( + sa_select(ISPyBDB.DataCollectionGroup).where( + ISPyBDB.DataCollectionGroup.dataCollectionGroupId == murfey_dcg.id + ) + ) + .scalars() + .all() + ) + assert len(ispyb_dcg_search) == 1 + + ispyb_dcg = ispyb_dcg_search[0] + ispyb_atlas_search = ( + ispyb_db_session.execute( + sa_select(ISPyBDB.Atlas).where( + ISPyBDB.Atlas.dataCollectionGroupId == ispyb_dcg.dataCollectionGroupId + ) + ) + .scalars() + .all() + ) + assert len(ispyb_atlas_search) == 1 + + ispyb_atlas = ispyb_atlas_search[0] + ispyb_gs_search = ( + ispyb_db_session.execute( + sa_select(ISPyBDB.GridSquare).where( + ISPyBDB.GridSquare.atlasId == ispyb_atlas.atlasId + ) + ) + .scalars() + .all() + ) + assert len(ispyb_gs_search) == len(preprocessing_messages) - 1 + murfey_db_session.close() + ispyb_db_session.close() From 3abe4f6ffed3476f3b53641b90a345c95ddc9e2f Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 6 Nov 2025 15:50:28 +0000 Subject: [PATCH 21/21] Fixed bug with grid square registration not running if the CLEM atlas is the last dataset to be registered --- .../clem/register_preprocessing_results.py | 13 ++++++------- .../clem/test_register_preprocessing_results.py | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 2804a108..e79da1d0 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -501,13 +501,12 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool ) try: - # Register dataset as grid square - if "Overview_" not in result.series_name: - _register_grid_square( - session_id=session_id, - result=result, - murfey_db=murfey_db, - ) + # Register CLEM image series as grid squares + _register_grid_square( + session_id=session_id, + result=result, + murfey_db=murfey_db, + ) except Exception: # Log error but allow workflow to proceed logger.error( diff --git a/tests/workflows/clem/test_register_preprocessing_results.py b/tests/workflows/clem/test_register_preprocessing_results.py index f617496f..06f7a5a0 100644 --- a/tests/workflows/clem/test_register_preprocessing_results.py +++ b/tests/workflows/clem/test_register_preprocessing_results.py @@ -160,7 +160,7 @@ def test_run( assert result == {"success": True} assert mock_register_clem_series.call_count == len(preprocessing_messages) assert mock_register_dcg_and_atlas.call_count == len(preprocessing_messages) - assert mock_register_grid_square.call_count == len(preprocessing_messages) - 1 + assert mock_register_grid_square.call_count == len(preprocessing_messages) assert mock_align_and_merge_call.call_count == len(preprocessing_messages) * len( colors )