diff --git a/README.md b/README.md index 635e077e..99e1f9f1 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ Ingress Summary Report for 2025-02-25 11:41:29.507022 Uploaded: 200 file(s) Skipped: 0 file(s) Failed: 0 file(s) +Unprocessed: 0 file(s) Total: 200 files(s) Time elapsed: 3019.00 seconds Bytes transferred: 3087368895 @@ -91,6 +92,8 @@ can be written to disk via the `--report-path` command-line argument: "Total Skipped": 0, "Failed": [], "Total Failed": 0, + "Unprocessed": [], + "Total Unprocessed": 0, "Bytes Transferred": 3087368895, "Total Files": 200 } diff --git a/docs/source/usage/index.rst b/docs/source/usage/index.rst index 0a24f83c..4e130cd7 100644 --- a/docs/source/usage/index.rst +++ b/docs/source/usage/index.rst @@ -60,6 +60,135 @@ The client script also provides several arguments for reporting status of an ing Lastly, the ``--version`` option may be used to verify the version number of installed DUM client. +Data Upload Manager Client Workflow +----------------------------------- + +When utilizing the DUM Client script (`pds-ingress-client`), the following workflow is executed: + +1. Indexing of the requested input files/paths to determine the full input file set +2. Generation of a Manifest file, containing information, including MD5 checksums, of each file to be ingested +3. Batch ingress requesting of input file set to the DUM Ingress Service in AWS +4. Batch upload of input file set to AWS S3 +5. Ingress report creation + +Determination of the input file set is determined in Step 1 by resolving the paths providing on +the command-line to the DUM client. Any directories provided are recursed to determine the full set +of files within. Any paths provided are included as-is into the input file set. + +Depending on the size of the input file set, the Manifest file creation in Step 2 can become +time-consuming due to the hashing of each file in the input file set. To save time, the `--manifest-path` +command-line option should be leveraged to write the contents of the Manifest to local disk. Specifying +the same path via `--manifest-path` on subsequent executions of the DUM client will result in +a read of the existing Manifest from disk. Any files within the input set referenced within the +read Manifest will reuse the precomputed values within, saving upfront time prior to start of upload +to S3. The Manifest will then be re-written to the path specified by `--manifest-path` to include +any new files encountered. In this way, a Manifest file can expand across executions of DUM to serve +as a sort of cache for file information. + +The batch size utilized by Steps 3 and 4 can be configured within the INI config provided to the +DUM client. The number of batches processed in parallel can be controlled via the `--num-threads` +command-line argument. + +By default, at completion of an ingress request (Step 5), the DUM client provides a summary of the +results of the transfer:: + + Ingress Summary Report for 2025-02-25 11:41:29.507022 + ----------------------------------------------------- + Uploaded: 200 file(s) + Skipped: 0 file(s) + Failed: 0 file(s) + Unprocessed: 0 file(s) + Total: 200 files(s) + Time elapsed: 3019.00 seconds + Bytes transferred: 3087368895 + +A more detailed JSON-format report, containing full listings of all uploaded/skipped/failed paths, +can be written to disk via the `--report-path` command-line argument:: + + { + "Arguments": "Namespace(config_path='mcp.test.ingress.config.ini', node='sbn', prefix='/PDS/SBN/', force_overwrite=True, num_threads=4, log_path='/tmp/dum_log.txt', manifest_path='/tmp/dum_manifest.json', report_path='/tmp/dum_report.json', dry_run=False, log_level='info', ingress_paths=['/PDS/SBN/gbo.ast.catalina.survey/'])", + "Batch Size": 3, + "Total Batches": 67, + "Start Time": "2025-02-25 18:51:10.507562+00:00", + "Finish Time": "2025-02-25 19:41:29.504806+00:00", + "Uploaded": [ + "gbo.ast.catalina.survey/data_calibrated/703/2020/20Apr02/703_20200402_2B_F48FC1_01_0001.arch.fz", + ... + "gbo.ast.catalina.survey/data_calibrated/703/2020/20Apr02/703_20200402_2B_N02055_01_0001.arch.xml" + ], + "Total Uploaded": 200, + "Skipped": [], + "Total Skipped": 0, + "Failed": [], + "Total Failed": 0, + "Unprocessed": [], + "Total Unprocessed": 0, + "Bytes Transferred": 3087368895, + "Total Files": 200 + } + + +Lastly, a detailed log file containing trace statements for each file/batch uploaded can be written +to disk via the `--log-path` command-line argument. The log file path may also be specifed within +the INI config. + +Automatic Retry of Failed Uploads +--------------------------------- + +The DUM client script is configured to automatically retry any failed uploads to S3 using exponential +backoff_ and retry. When an intermittent failure occurs during upload, messages pertaining to the +backoff and retry are logged to the log file (which can be specified via the `--log-path` argument). + +Here is an example of such log messages:: + + ... + [2025-09-23 16:21:24,491] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.xml to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.xml + [2025-09-23 16:21:24,493] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 0.2 seconds after 1 tries, reason: HTTPError + [2025-09-23 16:21:24,665] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.xml to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.xml + [2025-09-23 16:21:24,667] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 1.2 seconds after 2 tries, reason: HTTPError + [2025-09-23 16:21:25,832] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.xml to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.xml + [2025-09-23 16:21:25,833] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 1.8 seconds after 3 tries, reason: HTTPError + [2025-09-23 16:21:27,644] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.xml to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.xml + [2025-09-23 16:21:27,720] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : mflat.703.19Dec20.fits.xml Ingest complete + +Typically, log messages pertaining to backoff and retry can be safely ignored if upload is eventually succesful, as in the above example. +However, if an upload ultimately fails after all retries are exhausted it could indicate a more serious problem that needs to be investigated:: + + ... + [2025-09-23 16:31:47,231] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 30.9 seconds after 6 tries, reason: HTTPError + [2025-09-23 16:32:18,099] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.fz to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.fz + [2025-09-23 16:32:18,101] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 23.2 seconds after 7 tries, reason: HTTPError + [2025-09-23 16:32:41,324] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.fz to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.fz + [2025-09-23 16:32:41,326] WARNING Thread-9 (worker) backoff_handler : Backing off ingress_file_to_s3() for 54.8 seconds after 8 tries, reason: HTTPError + [2025-09-23 16:33:36,086] INFO Thread-9 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.fz to https://pds-staging.s3.amazonaws.com/mflat.703.19Dec20.fits.fz + [2025-09-23 16:33:36,087] ERROR Thread-9 (worker) _process_batch : Batch 0 : Ingress failed for mflat.703.19Dec20.fits.fz, Reason: 403 Client Error + +Any files that fail to upload after all retries are exhausted are reattempted in one final attempt at the end of DUM client execution:: + + ... + [2025-09-23 16:33:36,094] INFO MainThread main : All batches processed + [2025-09-23 16:33:36,094] INFO MainThread main : ---------------------------------------- + [2025-09-23 16:33:36,094] INFO MainThread main : Reattempting ingress for failed files... + [2025-09-23 16:33:36,096] INFO Thread-16 (worker) _prepare_batch_for_ingress : Batch 0 : Preparing for ingress + [2025-09-23 16:33:36,096] INFO Thread-16 (worker) _prepare_batch_for_ingress : Batch 0 : Prep completed in 0.00 seconds + [2025-09-23 16:33:36,108] INFO Thread-23 (worker) request_batch_for_ingress : Batch 0 : Requesting ingress + [2025-09-23 16:33:36,732] INFO Thread-23 (worker) request_batch_for_ingress : Batch 0 : Ingress request completed in 0.62 seconds + [2025-09-23 16:33:36,734] INFO Thread-23 (worker) ingress_file_to_s3 : Batch 0 : Ingesting mflat.703.19Dec20.fits.fz to https://pds-sbn-staging-dev.s3.amazonaws.com/mflat.703.19Dec20.fits.fz + +Files that still fail to upload during this final attempt are recorded in the final summary report:: + + Ingress Summary Report for 2025-09-23 16:35:37.532468 + ----------------------------------------------------- + Uploaded: 0 file(s) + Skipped: 0 file(s) + Failed: 1 file(s) + Unprocessed: 0 file(s) + Total: 1 files(s) + Time elapsed: 244.87 seconds + Bytes transferred: 0 + +Should persistent failures like this occur, they should be communicated to the PDS Operations team for investigation. .. References: +.. _backoff: https://pypi.org/project/backoff/ .. _installation: ../installation/index.html diff --git a/src/pds/ingress/client/pds_ingress_client.py b/src/pds/ingress/client/pds_ingress_client.py index c6b8010a..614399e0 100644 --- a/src/pds/ingress/client/pds_ingress_client.py +++ b/src/pds/ingress/client/pds_ingress_client.py @@ -26,6 +26,7 @@ from more_itertools import chunked as batched from pds.ingress import __version__ from pds.ingress.util.auth_util import AuthUtil +from pds.ingress.util.backoff_util import backoff_handler from pds.ingress.util.backoff_util import simulate_batch_request_failure from pds.ingress.util.backoff_util import simulate_ingress_failure from pds.ingress.util.config_util import ConfigUtil @@ -199,7 +200,7 @@ def _process_batch(batch_index, request_batch, node_id, force_overwrite, api_gat ingress_path = ingress_response.get("ingress_path") update_summary_table(SUMMARY_TABLE, "failed", ingress_path) - logger.error("Batch %d : Ingress failed for %s, Reason:\n%s", batch_index, trimmed_path, str(err)) + logger.error("Batch %d : Ingress failed for %s, Reason: %s", batch_index, trimmed_path, str(err)) continue # Move to next file in the batch except Exception as err: @@ -353,12 +354,7 @@ def _prepare_batch_for_ingress(ingress_path_batch, prefix, batch_index, batch_pb return request_batch -@backoff.on_exception( - backoff.expo, - Exception, - max_time=120, - logger="request_batch_for_ingress", -) +@backoff.on_exception(backoff.expo, Exception, max_time=120, on_backoff=backoff_handler, logger=None) def request_batch_for_ingress(request_batch, batch_index, node_id, force_overwrite, api_gateway_config): """ Submits a batch of ingress requests to the PDS Ingress App API. @@ -453,12 +449,7 @@ def request_batch_for_ingress(request_batch, batch_index, node_id, force_overwri response.raise_for_status() -@backoff.on_exception( - backoff.expo, - Exception, - max_time=120, - logger="ingress_file_to_s3", -) +@backoff.on_exception(backoff.expo, Exception, max_time=120, on_backoff=backoff_handler, logger=None) def ingress_file_to_s3(ingress_response, batch_index, batch_pbar): """ Copies the local file path using the pre-signed S3 URL returned from the @@ -544,12 +535,7 @@ def ingress_file_to_s3(ingress_response, batch_index, batch_pbar): # noinspection PyUnreachableCode -@backoff.on_exception( - backoff.expo, - Exception, - max_time=120, - logger="ingress_multipart_file_to_s3", -) +@backoff.on_exception(backoff.expo, Exception, max_time=120, on_backoff=backoff_handler, logger=None) def ingress_multipart_file_to_s3(ingress_response, batch_index, batch_pbar): """ Performs an ingress request for a file that is too large to be uploaded diff --git a/src/pds/ingress/util/backoff_util.py b/src/pds/ingress/util/backoff_util.py index b38713e8..d99cb8e9 100644 --- a/src/pds/ingress/util/backoff_util.py +++ b/src/pds/ingress/util/backoff_util.py @@ -16,6 +16,7 @@ import requests_mock from pds.ingress.util.config_util import ConfigUtil from pds.ingress.util.config_util import strtobool +from pds.ingress.util.log_util import get_logger # When leveraging this module with the Lambda service functions, the requests # module will not be available within the Python runtime. @@ -73,6 +74,26 @@ def fatal_code(err: requests.exceptions.RequestException) -> bool: return True +def backoff_handler(details): + """ + Handler function for logging backoff events though our logging framework. + Backoff events are only logged to CloudWatch and file, not to console. + + Parameters + ---------- + details : dict + Dictionary containing details about the backoff event. + + """ + function_name = details["target"].__name__ + exception_name = type(details["exception"]).__name__ + logger = get_logger(function_name, console=False) + logger.warning( + f"Backing off {function_name}() for {details['wait']:0.1f} seconds after " + f"{details['tries']} tries, reason: {exception_name}" + ) + + def check_failure_chance(percentage: int) -> bool: """ Checks if a simulated failure event should occur based on a given percentage diff --git a/src/pds/ingress/util/log_util.py b/src/pds/ingress/util/log_util.py index a6647a53..dc900ec8 100644 --- a/src/pds/ingress/util/log_util.py +++ b/src/pds/ingress/util/log_util.py @@ -291,6 +291,7 @@ def __init__(self, log_group_name, api_gateway_config, capacity=512): self._bearer_token = None self._node_id = None self._stream_created = False + self._next_sequence_token = None @property def bearer_token(self): @@ -453,3 +454,8 @@ def send_log_events_to_cloud_watch(self, log_events): response = requests.post(api_gateway_url, data=json.dumps(payload), headers=headers) response.raise_for_status() + + result = response.json() + + if "__type" in result and result["__type"] == "SerializationException": + console_logger.warning("CloudWatch Logs rejected the submitted log events, reason: SerializationException")