Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,753 changes: 1,141 additions & 612 deletions multisurvey-stamps/poetry.lock

Large diffs are not rendered by default.

18 changes: 11 additions & 7 deletions multisurvey-stamps/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
[project]
[tool.poetry]
name = "multisurvey-stamps"
version = "0.1.0"
description = ""
authors = [
{name = "Alex Alvarez",email = "[email protected]"}
]
authors = ["Claudio Mansilla <[email protected]>"]
readme = "README.md"
packages = [
{include = "multisurvey_stamps", from = "src"}
{ include = "core", from = "src" },
{ include = "multisurvey_stamps", from = "src" },
{ include = "s3_handler", from = "src" },
]


[tool.poetry.dependencies]
python = ">=3.11,<3.12"
prometheus-fastapi-instrumentator = "^6.1.0"
fastapi = "^0.115.12"
pydantic-settings = "^2.10.1"
pyyaml = "^6.0.2"
boto3 = "^1.39.14"
alerce = {git = "https://github.com/alercebroker/alerce_client.git", branch = "feat/return_stamps_function", develop = true}
astropy = "^7.1.0"
scipy = "^1.16.1"
fastavro = "^1.11.1"
matplotlib = "^3.10.3"
jinja2 = "^3.1.2"
httpx = "^0.24.1"
psycopg2-binary = "^2.9.6"
sqlalchemy = { extras = ["asyncio"], version = "^2.0.19" }

[tool.poetry.group.test.dependencies]
pytest = "^7.4.0"
Expand All @@ -43,4 +47,4 @@ line-length = 79


[tool.poetry.scripts]
service = "scripts.run_api:run"
all = "scripts.run_api:run"
103 changes: 92 additions & 11 deletions multisurvey-stamps/scripts/run_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,106 @@ def config_from_yaml():

return config

def run(
):
# def run(
# ):
# """
# Synchronous version of run_service.
# This is useful for running the service in a synchronous context.
# """
# config_dict = config_from_yaml()

# print(f"Config dict used:\n{config_dict}")
# buckets_config = config_dict["buckets_config"]

# for survey, values in buckets_config.items():

# os.environ[f"{survey}_BUCKET_REGION"] = values["bucket_region"]
# os.environ[f"{survey}_BUCKET_NAME"] = values["bucket_name"]

# uvicorn.run(
# f"src.{config_dict['source_folder']}.api:app",
# port=config_dict["port"],
# reload=config_dict.get("reload", True),
# reload_dirs=[".", "../libs"]
# )

def run():
asyncio.run(run_async())

async def run_async():
"""
Synchronous version of run_service.
This is useful for running the service in a synchronous context.
read the config from file

for each service in the config
run the service with the config dict
"""

tasks = []
config_dict = config_from_yaml()

print(f"Config dict used:\n{config_dict}")
buckets_config = config_dict["buckets_config"]
for service in config_dict["services"].keys():
service_config = config_dict["services"][service]
print(f"Creating task for service: {service}")
tasks.append(asyncio.create_task(async_run_service(service_config)))

for survey, values in buckets_config.items():
first = asyncio.as_completed(tasks).__next__()
await first
for task in tasks:
task.cancel("Shutting down")

async def async_run_service(
config_dict: dict = {},
):
db_config = config_dict.get("db_config", {})

server_config = uvicorn.Config(
# put the db config
f"{config_dict['source_folder']}.api:app",
port=config_dict["port"],
reload=config_dict.get("reload", True),
reload_dirs=[".", "../libs", "src/multisurvey_stamps/templates"],
)
server = uvicorn.Server(server_config)
os.environ["API_URL"] = config_dict["url"]
# export db secrets
os.environ["PSQL_USER"] = db_config["psql_user"]
os.environ["PSQL_PASSWORD"] = db_config["psql_password"]
os.environ["PSQL_DATABASE"] = db_config["psql_database"]
os.environ["PSQL_HOST"] = db_config["psql_host"]
os.environ["PSQL_PORT"] = str(db_config["psql_port"])
os.environ["SCHEMA"] = db_config["psql_schema"]

await server.serve()


def run_service(
config_dict: dict = {},
):
"""
Synchronous version of run_service.
This is useful for running the service in a synchronous context.
"""
db_config = config_dict.get("db_config", {})

os.environ["API_URL"] = config_dict["url"]
# export db secrets
os.environ["PSQL_USER"] = db_config["psql_user"]
os.environ["PSQL_PASSWORD"] = db_config["psql_password"]
os.environ["PSQL_DATABASE"] = db_config["psql_database"]
os.environ["PSQL_HOST"] = db_config["psql_host"]
os.environ["PSQL_PORT"] = str(db_config["psql_port"])
os.environ["SCHEMA"] = db_config["psql_schema"]

os.environ[f"{survey}_BUCKET_REGION"] = values["bucket_region"]
os.environ[f"{survey}_BUCKET_NAME"] = values["bucket_name"]

uvicorn.run(
f"src.{config_dict['source_folder']}.api:app",
port=config_dict["port"],
reload=config_dict.get("reload", True),
reload_dirs=[".", "../libs"]
reload_dirs=[".", "../libs", "src/multisurvey_stamps/templates"],
)

def run_stamp():
config_dict = config_from_yaml()
service_config = config_dict["services"]["stamp_api"]
print(f"Running service: stamp_api with config: {service_config}")
# Use the sync version of run_service to run the FastAPI app
run_service(service_config)
Empty file.
29 changes: 29 additions & 0 deletions multisurvey-stamps/src/core/config/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging
import os

from db_plugins.db.sql._connection import PsqlDatabase

logger = logging.getLogger(__name__)


class ApiDatabase:
"""Singleton wrapper for PsqlDatabase"""

_instance = None

def __new__(cls):
if cls._instance is None:
db_config = {
"USER": os.getenv("PSQL_USER"),
"PASSWORD": os.getenv("PSQL_PASSWORD"),
"DB_NAME": os.getenv("PSQL_DATABASE"),
"HOST": os.getenv("PSQL_HOST"),
"PORT": os.getenv("PSQL_PORT"),
"SCHEMA": os.getenv("SCHEMA"),
}
cls._instance = PsqlDatabase(db_config)
return cls._instance


def psql_entity():
return ApiDatabase()
6 changes: 6 additions & 0 deletions multisurvey-stamps/src/core/config/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import Annotated
from db_plugins.db.sql._connection import PsqlDatabase
from fastapi import Depends
from .connection import ApiDatabase

db_dependency = Annotated[PsqlDatabase, Depends(ApiDatabase)]
17 changes: 17 additions & 0 deletions multisurvey-stamps/src/core/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
class WrapperException(BaseException):
def __init__(self, original_e, subcode=None):
super().__init__()
self.original_exception = original_e
self.subcode = subcode

def __str__(self) -> str:
return self.original_exception.__str__()


class DatabaseError(WrapperException):
def __init__(self, original_e, database: str, subcode=None):
self.database = database
super().__init__(original_e, subcode)

def __str__(self) -> str:
return f"{self.database} error: {self.original_exception.__str__()}"
1 change: 1 addition & 0 deletions multisurvey-stamps/src/core/htmx/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The HTMX version is the 1.9.12
1 change: 1 addition & 0 deletions multisurvey-stamps/src/core/htmx/htmx.min.js

Large diffs are not rendered by default.

90 changes: 90 additions & 0 deletions multisurvey-stamps/src/core/idmapper/idmapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import numpy as np

from .ztf import encode_ztf_to_masterid_without_survey, decode_masterid_for_ztf

# Constants
SURVEY_IDS = {
"ZTF": 1,
"ATLAS": 2,
"LSST": 3,
"LS4": 4,
}
SURVEY_PREFIX_LEN_BITS = 8
SURVEY_IDS["MAXSURVEY"] = 2**SURVEY_PREFIX_LEN_BITS - 1

REVERSE_SURVEY_IDS = dict((zip(SURVEY_IDS.values(), SURVEY_IDS.keys())))


def encode_ids(survey, oids):
encode_array = []
for id in oids:
encode_id = catalog_oid_to_masterid(survey.upper(), id)
encode_array.append(encode_id)

return encode_array


def decode_ids(items):
for item in items:
oid = np.int64(item["oid"])
_, catalog_oid = decode_masterid(oid)
item["oid"] = catalog_oid

return items


def catalog_oid_to_masterid(
catalog: str,
catalog_oid: str | np.int64 | int,
validate: bool = False,
) -> np.int64:
"""
Convert a catalog object ID to a master ID.

Parameters
----------
catalog : str
The name of the catalog (e.g., "ZTF").
catalog_oid : str
The ZTF object ID.
validate: bool
If True, validate the ztf_oid before conversion.
Returns
-------
str
The master ID.
"""
catalog = catalog.upper()
if catalog not in SURVEY_IDS.keys():
raise ValueError(f"Unsupported catalog: {catalog}")

# Add the survey ID to the master ID
master_id = SURVEY_IDS[catalog] << (63 - SURVEY_PREFIX_LEN_BITS)
master_id = np.int64(master_id)

if catalog == "ZTF":
master_id += encode_ztf_to_masterid_without_survey(str(catalog_oid), validate)
elif catalog == "LSST":
return np.int64(catalog_oid)

return master_id


def decode_masterid(masterid: np.int64) -> tuple[str, str | np.int64]:
"""
Decode a master ID into its components.

Parameters
----------
masterid : np.int64
The master ID.
Returns
-------
tuple[str, str]
The survey of the object and the original oid.
"""
# Extract the survey from the master ID
# survey_id = masterid >> (63 - SURVEY_PREFIX_LEN_BITS)

masterid_without_survey = np.bitwise_and(masterid, ((1 << (63 - SURVEY_PREFIX_LEN_BITS)) - 1))
return "ZTF", decode_masterid_for_ztf(masterid_without_survey)
65 changes: 65 additions & 0 deletions multisurvey-stamps/src/core/idmapper/ztf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import numpy as np


def encode_ztf_to_masterid_without_survey(ztf_oid: str, validate: bool) -> np.int64:
if validate and not is_ztf_oid_valid(ztf_oid):
raise ValueError(f"Invalid ZTF object ID: {ztf_oid}")

year = ztf_oid[3:5]
seq = ztf_oid[5:12]

# Convert the sequence of letters to a number
master_id = 0
for i, char in enumerate(seq):
master_id += (ord(char) - ord("a")) * (26 ** (6 - i))

# Convert the year to a number and add it to the master ID
master_id += int(year) * 26**7
return np.int64(master_id)


def is_ztf_oid_valid(ztf_oid: str) -> bool:
"""
Checks that ztf_oid starts with ZTF, then two numbers and
finally a sequence of 7 lowercase letters between a and z.

:param ztf_oid: The ZTF object ID to validate.
:return: True if ztf_oid is valid, False otherwise
"""
if len(ztf_oid) != 12:
return False
if ztf_oid[0:3] != "ZTF":
return False
if not ztf_oid[3:5].isdigit():
return False
if not ztf_oid[5:12].isalpha():
return False
if not ztf_oid[5:12].islower():
return False
return True


def decode_masterid_for_ztf(masterid_without_survey: np.int64) -> str:
"""
Decode a master ID into its components.

Parameters
----------
masterid_without_survey : np.int64
The master ID without the survey ID.

Returns
-------
str
The original oid.
"""
year = (masterid_without_survey // (26**7)) % 100
seq = masterid_without_survey % (26**7)

# Convert the sequence of numbers back to letters
seq_str = ""
for i in range(6, -1, -1):
seq_str += chr((seq // (26**i)) + ord("a"))
seq %= 26**i

return f"ZTF{year:02d}{seq_str}"
Loading