Skip to content

Multithreading in rio_tiler's MultiBandReader causing JSON decode errors from earthaccess.Auth.get_s3_credentials #92

@abarciauskas-bgse

Description

@abarciauskas-bgse

Background:

While working on testing CMRBackend#tile with various CMR collections using the s3 credentials --> s3 session route (as opposed to the IAM role) I encountered JSON decode errors emanating from backend.py's aws_s3_credential. I noticed this call was being made within a multithreaded context in rio_tiler's multi_arrays function.

Here's a script to replicate the error:

code
#!/usr/bin/env python
"""
Script to replicate the JSON decode error from concurrent auth.get_s3_credentials() calls.

Problem:
- The earthaccess.Auth object is not thread-safe. When multiple threads calling auth.get_s3_credentials() concurrently it corrupts the 
internal state of the Auth object and results in JSON decode errors. The JSON decode errors are from trying to call `.json()` on an html page that I believe is a part of the redirect process.

Fix:
- A thread lock around auth.get_s3_credentials() so only one thread can fetch credentials at a time.
"""

import argparse
import concurrent.futures
import requests
from typing import Dict

import earthaccess
import threading

_credential_lock = threading.Lock()

def get_credentials(auth, provider: str, call_id: int, use_lock: bool = False) -> Dict:
    """
    Fetch S3 credentials with optional thread safety.

    Args:
        auth: earthaccess Auth object
        provider: NASA DAAC provider name
        call_id: Identifier for this call (for logging)
        use_lock: If True, use thread lock to prevent race conditions

    Returns:
        Dictionary of S3 credentials
    """
    lock_msg = " (with lock)" if use_lock else ""

    try:
        if use_lock:
            with _credential_lock:
                credentials = auth.get_s3_credentials(provider=provider)
        else:
            credentials = auth.get_s3_credentials(provider=provider)

        print(f"✓ Call {call_id}: Successfully got credentials{lock_msg}")
        return credentials
    except requests.exceptions.JSONDecodeError as e:
        print(f"✗ Call {call_id}: JSON decode error! {e}")
        raise
    except Exception as e:
        print(f"✗ Call {call_id}: Error - {type(e).__name__}: {e}")
        raise


def test_concurrent_calls(
    num_threads: int = 10,
    use_lock: bool = False
):
    """
    Test concurrent calls to get_s3_credentials with optional thread safety.

    This replicates the conditions that caused the JSON decode error. When multiple
    threads access the same Auth object, its internal state gets corrupted.

    Args:
        num_threads: Number of concurrent threads to use
        use_lock: If True, use thread lock to prevent race conditions
    """
    mode = "WITH LOCK (SAFE)" if use_lock else "WITHOUT LOCK (UNSAFE)"

    print(f"\n{'='*70}")
    print(f"TESTING: {mode}")
    print(f"{'='*70}\n")
    print("This test simulates the threading issue.")
    print(f"Running {num_threads} concurrent calls to auth.get_s3_credentials()...\n")

    # Login to earthaccess
    auth = earthaccess.login()

    if not auth:
        print("❌ Failed to authenticate with earthaccess")
        print("Make sure you have valid credentials configured:")
        print("  - EARTHDATA_USERNAME and EARTHDATA_PASSWORD env vars, or")
        print("  - ~/.netrc file with urs.earthdata.nasa.gov credentials")
        return

    # Use a known NASA DAAC provider
    provider = "NSIDC_CPRD"  # National Snow and Ice Data Center

    print(f"Using provider: {provider}\n")

    # Run concurrent calls
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = [
            executor.submit(get_credentials, auth, provider, i, use_lock)
            for i in range(num_threads)
        ]

        # Collect results
        success_count = 0
        error_count = 0
        json_errors = 0

        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
                success_count += 1
            except requests.exceptions.JSONDecodeError:
                error_count += 1
                json_errors += 1
            except Exception:
                error_count += 1

    # Report results
    print(f"\n{'='*70}")
    print("RESULTS")
    print(f"{'='*70}\n")
    print(f"Total calls: {num_threads}")
    print(f"✓ Successful: {success_count}")
    print(f"✗ Failed: {error_count}")
    print(f"  └─ JSON decode errors: {json_errors}")

    if json_errors > 0:
        print(f"\n🔴 JSON DECODE ERROR REPLICATED!")
    elif success_count == num_threads:
        print(f"\n✅ All calls succeeded (no errors detected)")
    else:
        print(f"\n⚠️  Some errors occurred but not JSON decode errors")

    print(f"\n{'='*70}")


def main():
    """Main entry point."""
    parser = argparse.ArgumentParser(
        description="Test concurrent auth.get_s3_credentials() calls with optional thread safety"
    )
    parser.add_argument(
        "--use-lock",
        # This action sets the argument's value to True if the flag is present on the command line.
        # If the flag is absent, the argument's value will be False by default.
        action="store_true",
        help="Use thread lock to prevent race conditions (safe mode)"
    )
    parser.add_argument(
        "--threads",
        type=int,
        default=20,
        help="Number of concurrent threads to use (default: 20)"
    )
    args = parser.parse_args()

    print("""
╔══════════════════════════════════════════════════════════════════════╗
║                  JSON DECODE ERROR REPLICATION                       ║
║                                                                      ║
║  This script demonstrates the threading issue that cause JSON        ║
║  decode errors when calling auth.get_s3_credentials() concurrently   ║
║                                                                      ║
║  Use --use-lock to test with thread safety enabled                   ║
╚══════════════════════════════════════════════════════════════════════╝
    """)

    test_concurrent_calls(num_threads=args.threads, use_lock=args.use_lock)


if __name__ == "__main__":
    main()

If you run the script with no command line argument, you will see multiple JSON decode errors. If you run the script using --use-lock, there should be no JSON decode errors.

Based on this, I think we should add a threading.Lock context around auth.get_s3_credentials(provider=provider). I.e.

_credential_lock = threading.Lock()

# caching decorator omitted
def aws_s3_credential(auth: Auth, provider: str) -> Dict:
    """Get AWS S3 credential through earthaccess.
    Thread-safe implementation using a lock to prevent concurrent access
    to the earthaccess auth object.
    """
    with _credential_lock:
        return auth.get_s3_credentials(provider=provider)

Pros:

  • This method is thread-safe

Cons:

  • This will slow down the methods that use it but less so I think than using a single thread. I would suspect the caching improves performance as well however I haven't done an analysis of the exact performance implications yet. I'm not sure an analysis is warranted at this point since we're not using this code path in production.

Alternatives:

  • Using multiple processes in place of multiple threads, but this would require rewriting parts of rio-tiler or implementing a lot more in titiler-cmr. And the overhead of spawning multiple processes may not be worth it?
  • Pre-fetching credentials. As long as we can assume the provider will be the same for all assets. Is this a safe assumption?

cc @hrodmn @jbusecke @vincentsarago

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions