Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 53 additions & 45 deletions src/apify/_charging.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import math
from dataclasses import dataclass
from datetime import datetime, timezone
Expand Down Expand Up @@ -137,6 +138,11 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No
self._not_ppe_warning_printed = False
self.active = False

self._charge_lock = asyncio.Lock()
"""Lock to synchronize charge operations and prevent race conditions between Actor.charge
and Actor.push_data calls.
"""

async def __aenter__(self) -> None:
"""Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually."""
# Validate config
Expand Down Expand Up @@ -223,58 +229,60 @@ def calculate_chargeable() -> dict[str, int | None]:
chargeable_within_limit=calculate_chargeable(),
)

# START OF CRITICAL SECTION - no awaits here
# Acquire lock to prevent race conditions between concurrent charge calls
# (e.g., when Actor.push_data with charging is called concurrently with Actor.charge).
async with self._charge_lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, on its own, doesn't fix the issue of multiple interleaved, PPE-aware Actor.push_data calls. The lock would need to be held for the whole duration of the push_data+charge sequence.

# START OF CRITICAL SECTION

# Determine the maximum amount of events that can be charged within the budget
max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name)
charged_count = min(count, max_chargeable if max_chargeable is not None else count)
# Determine the maximum amount of events that can be charged within the budget
max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name)
charged_count = min(count, max_chargeable if max_chargeable is not None else count)

if charged_count == 0:
return ChargeResult(
event_charge_limit_reached=True,
charged_count=0,
chargeable_within_limit=calculate_chargeable(),
)
if charged_count == 0:
return ChargeResult(
event_charge_limit_reached=True,
charged_count=0,
chargeable_within_limit=calculate_chargeable(),
)

pricing_info = self._pricing_info.get(
event_name,
PricingInfoItem(
price=Decimal()
if self._is_at_home
else Decimal(1), # Use a nonzero price for local development so that the maximum budget can be reached,
title=f"Unknown event '{event_name}'",
),
)
pricing_info = self._pricing_info.get(
event_name,
PricingInfoItem(
# Use a nonzero price for local development so that the maximum budget can be reached.
price=Decimal() if self._is_at_home else Decimal(1),
title=f"Unknown event '{event_name}'",
),
)

# Update the charging state
self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal()))
self._charging_state[event_name].charge_count += charged_count
self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price
# Update the charging state
self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal()))
self._charging_state[event_name].charge_count += charged_count
self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price

# If running on the platform, call the charge endpoint
if self._is_at_home:
if self._actor_run_id is None:
raise RuntimeError('Actor run ID not configured')

if event_name in self._pricing_info:
await self._client.run(self._actor_run_id).charge(event_name, charged_count)
else:
logger.warning(f"Attempting to charge for an unknown event '{event_name}'")

# Log the charged operation (if enabled)
if self._charging_log_dataset:
await self._charging_log_dataset.push_data(
{
'event_name': event_name,
'event_title': pricing_info.title,
'event_price_usd': round(pricing_info.price, 3),
'charged_count': charged_count,
'timestamp': datetime.now(timezone.utc).isoformat(),
}
)

# END OF CRITICAL SECTION

# If running on the platform, call the charge endpoint
if self._is_at_home:
if self._actor_run_id is None:
raise RuntimeError('Actor run ID not configured')

if event_name in self._pricing_info:
await self._client.run(self._actor_run_id).charge(event_name, charged_count)
else:
logger.warning(f"Attempting to charge for an unknown event '{event_name}'")

# Log the charged operation (if enabled)
if self._charging_log_dataset:
await self._charging_log_dataset.push_data(
{
'event_name': event_name,
'event_title': pricing_info.title,
'event_price_usd': round(pricing_info.price, 3),
'charged_count': charged_count,
'timestamp': datetime.now(timezone.utc).isoformat(),
}
)

# If it is not possible to charge the full amount, log that fact
if charged_count < count:
subject = 'instance' if count == 1 else 'instances'
Expand Down