diff --git a/src/apify/_charging.py b/src/apify/_charging.py index e9aa90b9..94ab3311 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import math from dataclasses import dataclass from datetime import datetime, timezone @@ -137,6 +138,11 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No self._not_ppe_warning_printed = False self.active = False + self._charge_lock = asyncio.Lock() + """Lock to synchronize charge operations and prevent race conditions between Actor.charge + and Actor.push_data calls. + """ + async def __aenter__(self) -> None: """Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually.""" # Validate config @@ -223,58 +229,60 @@ def calculate_chargeable() -> dict[str, int | None]: chargeable_within_limit=calculate_chargeable(), ) - # START OF CRITICAL SECTION - no awaits here + # Acquire lock to prevent race conditions between concurrent charge calls + # (e.g., when Actor.push_data with charging is called concurrently with Actor.charge). + async with self._charge_lock: + # START OF CRITICAL SECTION - # Determine the maximum amount of events that can be charged within the budget - max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name) - charged_count = min(count, max_chargeable if max_chargeable is not None else count) + # Determine the maximum amount of events that can be charged within the budget + max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name) + charged_count = min(count, max_chargeable if max_chargeable is not None else count) - if charged_count == 0: - return ChargeResult( - event_charge_limit_reached=True, - charged_count=0, - chargeable_within_limit=calculate_chargeable(), - ) + if charged_count == 0: + return ChargeResult( + event_charge_limit_reached=True, + charged_count=0, + chargeable_within_limit=calculate_chargeable(), + ) - pricing_info = self._pricing_info.get( - event_name, - PricingInfoItem( - price=Decimal() - if self._is_at_home - else Decimal(1), # Use a nonzero price for local development so that the maximum budget can be reached, - title=f"Unknown event '{event_name}'", - ), - ) + pricing_info = self._pricing_info.get( + event_name, + PricingInfoItem( + # Use a nonzero price for local development so that the maximum budget can be reached. + price=Decimal() if self._is_at_home else Decimal(1), + title=f"Unknown event '{event_name}'", + ), + ) - # Update the charging state - self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal())) - self._charging_state[event_name].charge_count += charged_count - self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price + # Update the charging state + self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal())) + self._charging_state[event_name].charge_count += charged_count + self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price + + # If running on the platform, call the charge endpoint + if self._is_at_home: + if self._actor_run_id is None: + raise RuntimeError('Actor run ID not configured') + + if event_name in self._pricing_info: + await self._client.run(self._actor_run_id).charge(event_name, charged_count) + else: + logger.warning(f"Attempting to charge for an unknown event '{event_name}'") + + # Log the charged operation (if enabled) + if self._charging_log_dataset: + await self._charging_log_dataset.push_data( + { + 'event_name': event_name, + 'event_title': pricing_info.title, + 'event_price_usd': round(pricing_info.price, 3), + 'charged_count': charged_count, + 'timestamp': datetime.now(timezone.utc).isoformat(), + } + ) # END OF CRITICAL SECTION - # If running on the platform, call the charge endpoint - if self._is_at_home: - if self._actor_run_id is None: - raise RuntimeError('Actor run ID not configured') - - if event_name in self._pricing_info: - await self._client.run(self._actor_run_id).charge(event_name, charged_count) - else: - logger.warning(f"Attempting to charge for an unknown event '{event_name}'") - - # Log the charged operation (if enabled) - if self._charging_log_dataset: - await self._charging_log_dataset.push_data( - { - 'event_name': event_name, - 'event_title': pricing_info.title, - 'event_price_usd': round(pricing_info.price, 3), - 'charged_count': charged_count, - 'timestamp': datetime.now(timezone.utc).isoformat(), - } - ) - # If it is not possible to charge the full amount, log that fact if charged_count < count: subject = 'instance' if count == 1 else 'instances'