Skip to content
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
"pyarrow>=14.0.1",
"scipy>=1.7.2", # kdtree
"universal-pathlib>=0.2.2",
"tabulate>=0.7.0",
]

[project.urls]
Expand All @@ -44,6 +45,7 @@ dev = [
"pytest",
"pytest-cov", # Used to report total code coverage
"pytest-mock", # Used to mock objects in tests
"types-tabulate", # Type information for tabulate
]
full = [
"fsspec[full]", # complete file system specs.
Expand Down
51 changes: 44 additions & 7 deletions src/lsdb/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def crossmatch(
) = BuiltInCrossmatchAlgorithm.KD_TREE,
output_catalog_name: str | None = None,
require_right_margin: bool = False,
suffix_method: str | None = None,
**kwargs,
) -> Catalog:
"""Perform a cross-match between two catalogs
Expand Down Expand Up @@ -244,6 +245,11 @@ def crossmatch(
Default: {left_name}_x_{right_name}
require_right_margin (bool): If true, raises an error if the right margin is missing which could
lead to incomplete crossmatches. Default: False
suffix_method (str): Method to use to add suffixes to columns. Options are:
- "overlapping_columns": only add suffixes to columns that are present in both catalogs
- "all_columns": add suffixes to all columns from both catalogs
Default: "all_columns" Warning: This default will change to "overlapping_columns" in a future
release.

Returns:
A Catalog with the data from the left and right catalogs merged with one row for each
Expand All @@ -268,10 +274,14 @@ def crossmatch(
if output_catalog_name is None:
output_catalog_name = f"{self.name}_x_{other.name}"
ddf, ddf_map, alignment = crossmatch_catalog_data(
self, other, suffixes, algorithm=algorithm, **kwargs
self, other, suffixes, algorithm=algorithm, suffix_method=suffix_method, **kwargs
)
new_catalog_info = create_merged_catalog_info(
self.hc_structure.catalog_info, other.hc_structure.catalog_info, output_catalog_name, suffixes
self,
other,
output_catalog_name,
suffixes,
suffix_method,
)
hc_catalog = self.hc_structure.__class__(
new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc
Expand Down Expand Up @@ -741,6 +751,7 @@ def merge_asof(
direction: str = "backward",
suffixes: tuple[str, str] | None = None,
output_catalog_name: str | None = None,
suffix_method: str | None = None,
):
"""Uses the pandas `merge_asof` function to merge two catalogs on their indices by distance of keys

Expand All @@ -754,6 +765,12 @@ def merge_asof(
other (lsdb.Catalog): the right catalog to merge to
suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names
direction (str): the direction to perform the merge_asof
output_catalog_name (str): The name of the resulting catalog to be stored in metadata
suffix_method (str): Method to use to add suffixes to columns. Options are:
- "overlapping_columns": only add suffixes to columns that are present in both catalogs
- "all_columns": add suffixes to all columns from both catalogs
Default: "all_columns" Warning: This default will change to "overlapping_columns" in a future
release.

Returns:
A new catalog with the columns from each of the input catalogs with their respective suffixes
Expand All @@ -765,7 +782,9 @@ def merge_asof(
if len(suffixes) != 2:
raise ValueError("`suffixes` must be a tuple with two strings")

ddf, ddf_map, alignment = merge_asof_catalog_data(self, other, suffixes=suffixes, direction=direction)
ddf, ddf_map, alignment = merge_asof_catalog_data(
self, other, suffixes=suffixes, direction=direction, suffix_method=suffix_method
)

if output_catalog_name is None:
output_catalog_name = (
Expand All @@ -774,7 +793,11 @@ def merge_asof(
)

new_catalog_info = create_merged_catalog_info(
self.hc_structure.catalog_info, other.hc_structure.catalog_info, output_catalog_name, suffixes
self,
other,
output_catalog_name,
suffixes,
suffix_method,
)
hc_catalog = hc.catalog.Catalog(
new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc
Expand All @@ -789,6 +812,7 @@ def join(
through: AssociationCatalog | None = None,
suffixes: tuple[str, str] | None = None,
output_catalog_name: str | None = None,
suffix_method: str | None = None,
) -> Catalog:
"""Perform a spatial join to another catalog

Expand All @@ -804,6 +828,11 @@ def join(
between pixels and individual rows.
suffixes (Tuple[str,str]): suffixes to apply to the columns of each table
output_catalog_name (str): The name of the resulting catalog to be stored in metadata
suffix_method (str): Method to use to add suffixes to columns. Options are:
- "overlapping_columns": only add suffixes to columns that are present in both catalogs
- "all_columns": add suffixes to all columns from both catalogs
Default: "all_columns" Warning: This default will change to "overlapping_columns" in a future
release.

Returns:
A new catalog with the columns from each of the input catalogs with their respective suffixes
Expand All @@ -818,21 +847,29 @@ def join(
self._check_unloaded_columns([left_on, right_on])

if through is not None:
ddf, ddf_map, alignment = join_catalog_data_through(self, other, through, suffixes)
ddf, ddf_map, alignment = join_catalog_data_through(
self, other, through, suffixes, suffix_method=suffix_method
)
else:
if left_on is None or right_on is None:
raise ValueError("Either both of left_on and right_on, or through must be set")
if left_on not in self._ddf.columns:
raise ValueError("left_on must be a column in the left catalog")
if right_on not in other._ddf.columns:
raise ValueError("right_on must be a column in the right catalog")
ddf, ddf_map, alignment = join_catalog_data_on(self, other, left_on, right_on, suffixes)
ddf, ddf_map, alignment = join_catalog_data_on(
self, other, left_on, right_on, suffixes, suffix_method=suffix_method
)

if output_catalog_name is None:
output_catalog_name = self.hc_structure.catalog_info.catalog_name

new_catalog_info = create_merged_catalog_info(
self.hc_structure.catalog_info, other.hc_structure.catalog_info, output_catalog_name, suffixes
self,
other,
output_catalog_name,
suffixes,
suffix_method,
)
hc_catalog = hc.catalog.Catalog(
new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc
Expand Down
12 changes: 8 additions & 4 deletions src/lsdb/core/crossmatch/abstract_crossmatch_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from hats.catalog import TableProperties
from hats.pixel_math.spatial_index import SPATIAL_INDEX_COLUMN

from lsdb.dask.merge_catalog_functions import apply_suffixes

if TYPE_CHECKING:
from lsdb.catalog import Catalog

Expand Down Expand Up @@ -96,14 +98,14 @@ def __init__(
self.right_catalog_info = right_catalog_info
self.right_margin_catalog_info = right_margin_catalog_info

def crossmatch(self, suffixes, **kwargs) -> npd.NestedFrame:
def crossmatch(self, suffixes, suffix_method="all_columns", **kwargs) -> npd.NestedFrame:
"""Perform a crossmatch"""
l_inds, r_inds, extra_cols = self.perform_crossmatch(**kwargs)
if not len(l_inds) == len(r_inds) == len(extra_cols):
raise ValueError(
"Crossmatch algorithm must return left and right indices and extra columns with same length"
)
return self._create_crossmatch_df(l_inds, r_inds, extra_cols, suffixes)
return self._create_crossmatch_df(l_inds, r_inds, extra_cols, suffixes, suffix_method)

def crossmatch_nested(self, nested_column_name, **kwargs) -> npd.NestedFrame:
"""Perform a crossmatch"""
Expand Down Expand Up @@ -196,6 +198,7 @@ def _create_crossmatch_df(
right_idx: npt.NDArray[np.int64],
extra_cols: pd.DataFrame,
suffixes: tuple[str, str],
suffix_method="all_columns",
) -> npd.NestedFrame:
"""Creates a df containing the crossmatch result from matching indices and additional columns

Expand All @@ -209,8 +212,9 @@ def _create_crossmatch_df(
additional columns added
"""
# rename columns so no same names during merging
self._rename_columns_with_suffix(self.left, suffixes[0])
self._rename_columns_with_suffix(self.right, suffixes[1])
self.left, self.right = apply_suffixes(
self.left, self.right, suffixes, suffix_method, log_changes=False
)
# concat dataframes together
index_name = self.left.index.name if self.left.index.name is not None else "index"
left_join_part = self.left.iloc[left_idx].reset_index()
Expand Down
10 changes: 8 additions & 2 deletions src/lsdb/dask/crossmatch_catalog_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def perform_crossmatch(
right_margin_catalog_info,
algorithm,
suffixes,
suffix_method,
meta_df,
**kwargs,
):
Expand All @@ -66,7 +67,7 @@ def perform_crossmatch(
left_catalog_info,
right_catalog_info,
right_margin_catalog_info,
).crossmatch(suffixes, **kwargs)
).crossmatch(suffixes, suffix_method=suffix_method, **kwargs)


# pylint: disable=too-many-arguments, unused-argument
Expand Down Expand Up @@ -119,6 +120,7 @@ def crossmatch_catalog_data(
algorithm: (
Type[AbstractCrossmatchAlgorithm] | BuiltInCrossmatchAlgorithm
) = BuiltInCrossmatchAlgorithm.KD_TREE,
suffix_method: str | None = None,
**kwargs,
) -> tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]:
"""Cross-matches the data from two catalogs
Expand Down Expand Up @@ -157,7 +159,10 @@ def crossmatch_catalog_data(

# generate meta table structure for dask df
meta_df = generate_meta_df_for_joined_tables(
[left, right], suffixes, extra_columns=crossmatch_algorithm.extra_columns
(left, right),
suffixes,
suffix_method=suffix_method,
extra_columns=crossmatch_algorithm.extra_columns,
)

# perform the crossmatch on each partition pairing using dask delayed for lazy computation
Expand All @@ -166,6 +171,7 @@ def crossmatch_catalog_data(
perform_crossmatch,
crossmatch_algorithm,
suffixes,
suffix_method,
meta_df,
**kwargs,
)
Expand Down
Loading
Loading