-
Notifications
You must be signed in to change notification settings - Fork 4
Open
Description
Background:
While working on testing CMRBackend#tile with various CMR collections using the s3 credentials --> s3 session route (as opposed to the IAM role) I encountered JSON decode errors emanating from backend.py's aws_s3_credential. I noticed this call was being made within a multithreaded context in rio_tiler's multi_arrays function.
Here's a script to replicate the error:
code
#!/usr/bin/env python
"""
Script to replicate the JSON decode error from concurrent auth.get_s3_credentials() calls.
Problem:
- The earthaccess.Auth object is not thread-safe. When multiple threads calling auth.get_s3_credentials() concurrently it corrupts the
internal state of the Auth object and results in JSON decode errors. The JSON decode errors are from trying to call `.json()` on an html page that I believe is a part of the redirect process.
Fix:
- A thread lock around auth.get_s3_credentials() so only one thread can fetch credentials at a time.
"""
import argparse
import concurrent.futures
import requests
from typing import Dict
import earthaccess
import threading
_credential_lock = threading.Lock()
def get_credentials(auth, provider: str, call_id: int, use_lock: bool = False) -> Dict:
"""
Fetch S3 credentials with optional thread safety.
Args:
auth: earthaccess Auth object
provider: NASA DAAC provider name
call_id: Identifier for this call (for logging)
use_lock: If True, use thread lock to prevent race conditions
Returns:
Dictionary of S3 credentials
"""
lock_msg = " (with lock)" if use_lock else ""
try:
if use_lock:
with _credential_lock:
credentials = auth.get_s3_credentials(provider=provider)
else:
credentials = auth.get_s3_credentials(provider=provider)
print(f"✓ Call {call_id}: Successfully got credentials{lock_msg}")
return credentials
except requests.exceptions.JSONDecodeError as e:
print(f"✗ Call {call_id}: JSON decode error! {e}")
raise
except Exception as e:
print(f"✗ Call {call_id}: Error - {type(e).__name__}: {e}")
raise
def test_concurrent_calls(
num_threads: int = 10,
use_lock: bool = False
):
"""
Test concurrent calls to get_s3_credentials with optional thread safety.
This replicates the conditions that caused the JSON decode error. When multiple
threads access the same Auth object, its internal state gets corrupted.
Args:
num_threads: Number of concurrent threads to use
use_lock: If True, use thread lock to prevent race conditions
"""
mode = "WITH LOCK (SAFE)" if use_lock else "WITHOUT LOCK (UNSAFE)"
print(f"\n{'='*70}")
print(f"TESTING: {mode}")
print(f"{'='*70}\n")
print("This test simulates the threading issue.")
print(f"Running {num_threads} concurrent calls to auth.get_s3_credentials()...\n")
# Login to earthaccess
auth = earthaccess.login()
if not auth:
print("❌ Failed to authenticate with earthaccess")
print("Make sure you have valid credentials configured:")
print(" - EARTHDATA_USERNAME and EARTHDATA_PASSWORD env vars, or")
print(" - ~/.netrc file with urs.earthdata.nasa.gov credentials")
return
# Use a known NASA DAAC provider
provider = "NSIDC_CPRD" # National Snow and Ice Data Center
print(f"Using provider: {provider}\n")
# Run concurrent calls
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(get_credentials, auth, provider, i, use_lock)
for i in range(num_threads)
]
# Collect results
success_count = 0
error_count = 0
json_errors = 0
for future in concurrent.futures.as_completed(futures):
try:
future.result()
success_count += 1
except requests.exceptions.JSONDecodeError:
error_count += 1
json_errors += 1
except Exception:
error_count += 1
# Report results
print(f"\n{'='*70}")
print("RESULTS")
print(f"{'='*70}\n")
print(f"Total calls: {num_threads}")
print(f"✓ Successful: {success_count}")
print(f"✗ Failed: {error_count}")
print(f" └─ JSON decode errors: {json_errors}")
if json_errors > 0:
print(f"\n🔴 JSON DECODE ERROR REPLICATED!")
elif success_count == num_threads:
print(f"\n✅ All calls succeeded (no errors detected)")
else:
print(f"\n⚠️ Some errors occurred but not JSON decode errors")
print(f"\n{'='*70}")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Test concurrent auth.get_s3_credentials() calls with optional thread safety"
)
parser.add_argument(
"--use-lock",
# This action sets the argument's value to True if the flag is present on the command line.
# If the flag is absent, the argument's value will be False by default.
action="store_true",
help="Use thread lock to prevent race conditions (safe mode)"
)
parser.add_argument(
"--threads",
type=int,
default=20,
help="Number of concurrent threads to use (default: 20)"
)
args = parser.parse_args()
print("""
╔══════════════════════════════════════════════════════════════════════╗
║ JSON DECODE ERROR REPLICATION ║
║ ║
║ This script demonstrates the threading issue that cause JSON ║
║ decode errors when calling auth.get_s3_credentials() concurrently ║
║ ║
║ Use --use-lock to test with thread safety enabled ║
╚══════════════════════════════════════════════════════════════════════╝
""")
test_concurrent_calls(num_threads=args.threads, use_lock=args.use_lock)
if __name__ == "__main__":
main()If you run the script with no command line argument, you will see multiple JSON decode errors. If you run the script using --use-lock, there should be no JSON decode errors.
Based on this, I think we should add a threading.Lock context around auth.get_s3_credentials(provider=provider). I.e.
_credential_lock = threading.Lock()
# caching decorator omitted
def aws_s3_credential(auth: Auth, provider: str) -> Dict:
"""Get AWS S3 credential through earthaccess.
Thread-safe implementation using a lock to prevent concurrent access
to the earthaccess auth object.
"""
with _credential_lock:
return auth.get_s3_credentials(provider=provider)Pros:
- This method is thread-safe
Cons:
- This will slow down the methods that use it but less so I think than using a single thread. I would suspect the caching improves performance as well however I haven't done an analysis of the exact performance implications yet. I'm not sure an analysis is warranted at this point since we're not using this code path in production.
Alternatives:
- Using multiple processes in place of multiple threads, but this would require rewriting parts of rio-tiler or implementing a lot more in titiler-cmr. And the overhead of spawning multiple processes may not be worth it?
- Pre-fetching credentials. As long as we can assume the provider will be the same for all assets. Is this a safe assumption?
Metadata
Metadata
Assignees
Labels
No labels