Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Ingress Summary Report for 2025-02-25 11:41:29.507022
Uploaded: 200 file(s)
Skipped: 0 file(s)
Failed: 0 file(s)
Unprocessed: 0 file(s)
Total: 200 files(s)
Time elapsed: 3019.00 seconds
Bytes transferred: 3087368895
Expand All @@ -91,6 +92,8 @@ can be written to disk via the `--report-path` command-line argument:
"Total Skipped": 0,
"Failed": [],
"Total Failed": 0,
"Unprocessed": [],
"Total Unprocessed": 0,
"Bytes Transferred": 3087368895,
"Total Files": 200
}
Expand Down
129 changes: 129 additions & 0 deletions docs/source/usage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,135 @@ The client script also provides several arguments for reporting status of an ing

Lastly, the ``--version`` option may be used to verify the version number of installed DUM client.

Data Upload Manager Client Workflow
-----------------------------------

When utilizing the DUM Client script (`pds-ingress-client`), the following workflow is executed:

1. Indexing of the requested input files/paths to determine the full input file set
2. Generation of a Manifest file, containing information, including MD5 checksums, of each file to be ingested
3. Batch ingress requesting of input file set to the DUM Ingress Service in AWS
4. Batch upload of input file set to AWS S3
5. Ingress report creation

Determination of the input file set is determined in Step 1 by resolving the paths providing on
the command-line to the DUM client. Any directories provided are recursed to determine the full set
of files within. Any paths provided are included as-is into the input file set.

Depending on the size of the input file set, the Manifest file creation in Step 2 can become
time-consuming due to the hashing of each file in the input file set. To save time, the `--manifest-path`
command-line option should be leveraged to write the contents of the Manifest to local disk. Specifying
the same path via `--manifest-path` on subsequent executions of the DUM client will result in
a read of the existing Manifest from disk. Any files within the input set referenced within the
read Manifest will reuse the precomputed values within, saving upfront time prior to start of upload
to S3. The Manifest will then be re-written to the path specified by `--manifest-path` to include
any new files encountered. In this way, a Manifest file can expand across executions of DUM to serve
as a sort of cache for file information.

The batch size utilized by Steps 3 and 4 can be configured within the INI config provided to the
DUM client. The number of batches processed in parallel can be controlled via the `--num-threads`
command-line argument.

By default, at completion of an ingress request (Step 5), the DUM client provides a summary of the
results of the transfer::

Ingress Summary Report for 2025-02-25 11:41:29.507022
-----------------------------------------------------
Uploaded: 200 file(s)
Skipped: 0 file(s)
Failed: 0 file(s)
Unprocessed: 0 file(s)
Total: 200 files(s)
Time elapsed: 3019.00 seconds
Bytes transferred: 3087368895

A more detailed JSON-format report, containing full listings of all uploaded/skipped/failed paths,
can be written to disk via the `--report-path` command-line argument::

{
"Arguments": "Namespace(config_path='mcp.test.ingress.config.ini', node='sbn', prefix='/PDS/SBN/', force_overwrite=True, num_threads=4, log_path='/tmp/dum_log.txt', manifest_path='/tmp/dum_manifest.json', report_path='/tmp/dum_report.json', dry_run=False, log_level='info', ingress_paths=['/PDS/SBN/gbo.ast.catalina.survey/'])",
"Batch Size": 3,
"Total Batches": 67,
"Start Time": "2025-02-25 18:51:10.507562+00:00",
"Finish Time": "2025-02-25 19:41:29.504806+00:00",
"Uploaded": [
"gbo.ast.catalina.survey/data_calibrated/703/2020/20Apr02/703_20200402_2B_F48FC1_01_0001.arch.fz",
...
"gbo.ast.catalina.survey/data_calibrated/703/2020/20Apr02/703_20200402_2B_N02055_01_0001.arch.xml"
],
"Total Uploaded": 200,
"Skipped": [],
"Total Skipped": 0,
"Failed": [],
"Total Failed": 0,
"Unprocessed": [],
"Total Unprocessed": 0,
"Bytes Transferred": 3087368895,
"Total Files": 200
}


Lastly, a detailed log file containing trace statements for each file/batch uploaded can be written
to disk via the `--log-path` command-line argument. The log file path may also be specifed within
the INI config.

Automatic Retry of Failed Uploads
---------------------------------

The DUM client script is configured to automatically retry any failed uploads to S3 using exponential
backoff_ and retry. When an intermittent failure occurs during upload, messages pertaining to the
backoff and retry are logged to the log file (which can be specified via the `--log-path` argument).

Here is an example of such log messages::

...
[2025-09-23 16:21:24,491] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.xml to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.xml
[2025-09-23 16:21:24,493] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 0.2 seconds after 1 tries, reason: HTTPError
[2025-09-23 16:21:24,665] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.xml to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.xml
[2025-09-23 16:21:24,667] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 1.2 seconds after 2 tries, reason: HTTPError
[2025-09-23 16:21:25,832] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.xml to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.xml
[2025-09-23 16:21:25,833] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 1.8 seconds after 3 tries, reason: HTTPError
[2025-09-23 16:21:27,644] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.xml to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.xml
[2025-09-23 16:21:27,720] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : mflat.703.19Dec20.fits.xml Ingest complete

Typically, log messages pertaining to backoff and retry can be safely ignored if upload is eventually succesful, as in the above example.
However, if an upload ultimately fails after all retries are exhausted it could indicate a more serious problem that needs to be investigated::

...
[2025-09-23 16:31:47,231] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 30.9 seconds after 6 tries, reason: HTTPError
[2025-09-23 16:32:18,099] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.fz to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.fz
[2025-09-23 16:32:18,101] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 23.2 seconds after 7 tries, reason: HTTPError
[2025-09-23 16:32:41,324] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.fz to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.fz
[2025-09-23 16:32:41,326] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 54.8 seconds after 8 tries, reason: HTTPError
[2025-09-23 16:33:36,086] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.fz to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.fz
[2025-09-23 16:33:36,087] ERROR Thread-9 (worker) _process_batch : Batch 0 : Ingress failed for mflat.703.19Dec20.fits.fz, Reason: 403 Client Error

Any files that fail to upload after all retries are exhausted are reattempted in one final attempt at the end of DUM client execution::

...
[2025-09-23 16:33:36,094] INFO MainThread main : All batches processed
[2025-09-23 16:33:36,094] INFO MainThread main : ----------------------------------------
[2025-09-23 16:33:36,094] INFO MainThread main : Reattempting ingress for failed files...
[2025-09-23 16:33:36,096] INFO Thread-16 (worker) _prepare_batch_for_ingress : Batch 0 : Preparing for ingress
[2025-09-23 16:33:36,096] INFO Thread-16 (worker) _prepare_batch_for_ingress : Batch 0 : Prep completed in 0.00 seconds
[2025-09-23 16:33:36,108] INFO Thread-23 (worker) request_batch_for_ingress : Batch 0 : Requesting ingress
[2025-09-23 16:33:36,732] INFO Thread-23 (worker) request_batch_for_ingress : Batch 0 : Ingress request completed in 0.62 seconds
[2025-09-23 16:33:36,734] INFO Thread-23 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.fz to https://pds-sbn-staging-dev.s3.amazonaws.com/mflat.703.19Dec20.fits.fz

Files that still fail to upload during this final attempt are recorded in the final summary report::

Ingress Summary Report for 2025-09-23 16:35:37.532468
-----------------------------------------------------
Uploaded: 0 file(s)
Skipped: 0 file(s)
Failed: 1 file(s)
Unprocessed: 0 file(s)
Total: 1 files(s)
Time elapsed: 244.87 seconds
Bytes transferred: 0

Should persistent failures like this occur, they should be communicated to the PDS Operations team for investigation.

.. References:
.. _backoff: https://pypi.org/project/backoff/
.. _installation: ../installation/index.html
24 changes: 5 additions & 19 deletions src/pds/ingress/client/pds_ingress_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from more_itertools import chunked as batched
from pds.ingress import __version__
from pds.ingress.util.auth_util import AuthUtil
from pds.ingress.util.backoff_util import backoff_handler
from pds.ingress.util.backoff_util import simulate_batch_request_failure
from pds.ingress.util.backoff_util import simulate_ingress_failure
from pds.ingress.util.config_util import ConfigUtil
Expand Down Expand Up @@ -199,7 +200,7 @@ def _process_batch(batch_index, request_batch, node_id, force_overwrite, api_gat
ingress_path = ingress_response.get("ingress_path")
update_summary_table(SUMMARY_TABLE, "failed", ingress_path)

logger.error("Batch %d : Ingress failed for %s, Reason:\n%s", batch_index, trimmed_path, str(err))
logger.error("Batch %d : Ingress failed for %s, Reason: %s", batch_index, trimmed_path, str(err))

continue # Move to next file in the batch
except Exception as err:
Expand Down Expand Up @@ -353,12 +354,7 @@ def _prepare_batch_for_ingress(ingress_path_batch, prefix, batch_index, batch_pb
return request_batch


@backoff.on_exception(
backoff.expo,
Exception,
max_time=120,
logger="request_batch_for_ingress",
)
@backoff.on_exception(backoff.expo, Exception, max_time=120, on_backoff=backoff_handler, logger=None)
def request_batch_for_ingress(request_batch, batch_index, node_id, force_overwrite, api_gateway_config):
"""
Submits a batch of ingress requests to the PDS Ingress App API.
Expand Down Expand Up @@ -453,12 +449,7 @@ def request_batch_for_ingress(request_batch, batch_index, node_id, force_overwri
response.raise_for_status()


@backoff.on_exception(
backoff.expo,
Exception,
max_time=120,
logger="ingress_file_to_s3",
)
@backoff.on_exception(backoff.expo, Exception, max_time=120, on_backoff=backoff_handler, logger=None)
def ingress_file_to_s3(ingress_response, batch_index, batch_pbar):
"""
Copies the local file path using the pre-signed S3 URL returned from the
Expand Down Expand Up @@ -544,12 +535,7 @@ def ingress_file_to_s3(ingress_response, batch_index, batch_pbar):


# noinspection PyUnreachableCode
@backoff.on_exception(
backoff.expo,
Exception,
max_time=120,
logger="ingress_multipart_file_to_s3",
)
@backoff.on_exception(backoff.expo, Exception, max_time=120, on_backoff=backoff_handler, logger=None)
def ingress_multipart_file_to_s3(ingress_response, batch_index, batch_pbar):
"""
Performs an ingress request for a file that is too large to be uploaded
Expand Down
21 changes: 21 additions & 0 deletions src/pds/ingress/util/backoff_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import requests_mock
from pds.ingress.util.config_util import ConfigUtil
from pds.ingress.util.config_util import strtobool
from pds.ingress.util.log_util import get_logger

# When leveraging this module with the Lambda service functions, the requests
# module will not be available within the Python runtime.
Expand Down Expand Up @@ -73,6 +74,26 @@ def fatal_code(err: requests.exceptions.RequestException) -> bool:
return True


def backoff_handler(details):
"""
Handler function for logging backoff events though our logging framework.
Backoff events are only logged to CloudWatch and file, not to console.

Parameters
----------
details : dict
Dictionary containing details about the backoff event.

"""
function_name = details["target"].__name__
exception_name = type(details["exception"]).__name__
logger = get_logger(function_name, console=False)
logger.warning(
f"Backing off {function_name}() for {details['wait']:0.1f} seconds after "
f"{details['tries']} tries, reason: {exception_name}"
)


def check_failure_chance(percentage: int) -> bool:
"""
Checks if a simulated failure event should occur based on a given percentage
Expand Down
6 changes: 6 additions & 0 deletions src/pds/ingress/util/log_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ def __init__(self, log_group_name, api_gateway_config, capacity=512):
self._bearer_token = None
self._node_id = None
self._stream_created = False
self._next_sequence_token = None

@property
def bearer_token(self):
Expand Down Expand Up @@ -453,3 +454,8 @@ def send_log_events_to_cloud_watch(self, log_events):

response = requests.post(api_gateway_url, data=json.dumps(payload), headers=headers)
response.raise_for_status()

result = response.json()

if "__type" in result and result["__type"] == "SerializationException":
console_logger.warning("CloudWatch Logs rejected the submitted log events, reason: SerializationException")