Skip to content

Commit ca37836

Browse files
Pijukatelvdusek
andauthored
refactor: Extract API storage client creation from the Apify storage code (#663)
### Description - Extract API storage client creation from the Apify storage `open` method to `ApiClientFactory` - Create specific variants of the factory and use them in `open` methods of Apify storages. --------- Co-authored-by: Vlada Dusek <[email protected]>
1 parent bc8af3f commit ca37836

File tree

9 files changed

+501
-388
lines changed

9 files changed

+501
-388
lines changed
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from asyncio import Lock
5+
from logging import getLogger
6+
from typing import TYPE_CHECKING, ClassVar, Literal, overload
7+
8+
from apify_client import ApifyClientAsync
9+
10+
from ._utils import hash_api_base_url_and_token
11+
from apify._configuration import Configuration
12+
13+
if TYPE_CHECKING:
14+
from collections.abc import Callable
15+
from types import TracebackType
16+
17+
from apify_client.clients import (
18+
DatasetClientAsync,
19+
DatasetCollectionClientAsync,
20+
KeyValueStoreClientAsync,
21+
KeyValueStoreCollectionClientAsync,
22+
RequestQueueClientAsync,
23+
RequestQueueCollectionClientAsync,
24+
)
25+
26+
logger = getLogger(__name__)
27+
28+
29+
@overload
30+
async def open_by_alias(
31+
*,
32+
alias: str,
33+
storage_type: Literal['Dataset'],
34+
collection_client: DatasetCollectionClientAsync,
35+
get_resource_client_by_id: Callable[[str], DatasetClientAsync],
36+
configuration: Configuration,
37+
) -> DatasetClientAsync: ...
38+
39+
40+
@overload
41+
async def open_by_alias(
42+
*,
43+
alias: str,
44+
storage_type: Literal['KeyValueStore'],
45+
collection_client: KeyValueStoreCollectionClientAsync,
46+
get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync],
47+
configuration: Configuration,
48+
) -> KeyValueStoreClientAsync: ...
49+
50+
51+
@overload
52+
async def open_by_alias(
53+
*,
54+
alias: str,
55+
storage_type: Literal['RequestQueue'],
56+
collection_client: RequestQueueCollectionClientAsync,
57+
get_resource_client_by_id: Callable[[str], RequestQueueClientAsync],
58+
configuration: Configuration,
59+
) -> RequestQueueClientAsync: ...
60+
61+
62+
async def open_by_alias(
63+
*,
64+
alias: str,
65+
storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'],
66+
collection_client: (
67+
KeyValueStoreCollectionClientAsync | RequestQueueCollectionClientAsync | DatasetCollectionClientAsync
68+
),
69+
get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync],
70+
configuration: Configuration,
71+
) -> KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync:
72+
"""Open storage by alias, creating it if necessary.
73+
74+
This function resolves storage aliases to their IDs, creating new unnamed storage if needed.
75+
The alias mapping is stored in the default key-value store for persistence across Actor runs.
76+
77+
Args:
78+
alias: The alias name for the storage (e.g., '__default__', 'my-storage').
79+
storage_type: The type of storage to open.
80+
collection_client: The Apify API collection client for the storage type.
81+
get_resource_client_by_id: A callable that takes a storage ID and returns the resource client.
82+
configuration: Configuration object containing API credentials and settings.
83+
84+
Returns:
85+
The storage client for the opened or created storage.
86+
87+
Raises:
88+
ValueError: If storage ID cannot be determined from API response.
89+
TypeError: If API response format is unexpected.
90+
"""
91+
async with AliasResolver(
92+
storage_type=storage_type,
93+
alias=alias,
94+
configuration=configuration,
95+
) as alias_resolver:
96+
storage_id = await alias_resolver.resolve_id()
97+
98+
if storage_id:
99+
# Check if storage with this ID exists
100+
resource_client = get_resource_client_by_id(storage_id)
101+
raw_metadata = await resource_client.get()
102+
if raw_metadata:
103+
return resource_client
104+
105+
# Create new unnamed storage and store alias mapping
106+
raw_metadata = await collection_client.get_or_create()
107+
108+
await alias_resolver.store_mapping(storage_id=raw_metadata['id'])
109+
return get_resource_client_by_id(raw_metadata['id'])
110+
111+
112+
class AliasResolver:
113+
"""Class for handling aliases.
114+
115+
The purpose of this is class is to ensure that alias storages are created with correct id. This is achieved by using
116+
default kvs as a storage for global mapping of aliases to storage ids. Same mapping is also kept in memory to avoid
117+
unnecessary calls to API and also have limited support of alias storages when not running on Apify platform. When on
118+
Apify platform, the storages created with alias are accessible by the same alias even after migration or reboot.
119+
"""
120+
121+
_ALIAS_MAPPING_KEY = '__STORAGE_ALIASES_MAPPING'
122+
"""Key used for storing the alias mapping in the default kvs."""
123+
124+
_ALIAS_STORAGE_KEY_SEPARATOR = ','
125+
"""Separator used in the storage key for storing the alias mapping."""
126+
127+
_alias_map: ClassVar[dict[str, str]] = {}
128+
"""Map containing pre-existing alias storages and their ids. Global for all instances."""
129+
130+
_alias_init_lock: Lock | None = None
131+
"""Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances."""
132+
133+
def __init__(
134+
self,
135+
storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'],
136+
alias: str,
137+
configuration: Configuration,
138+
) -> None:
139+
self._storage_type = storage_type
140+
self._alias = alias
141+
self._configuration = configuration
142+
self._additional_cache_key = hash_api_base_url_and_token(configuration)
143+
144+
async def __aenter__(self) -> AliasResolver:
145+
"""Context manager to prevent race condition in alias creation."""
146+
lock = await self._get_alias_init_lock()
147+
await lock.acquire()
148+
return self
149+
150+
async def __aexit__(
151+
self,
152+
exc_type: type[BaseException] | None,
153+
exc_value: BaseException | None,
154+
exc_traceback: TracebackType | None,
155+
) -> None:
156+
lock = await self._get_alias_init_lock()
157+
lock.release()
158+
159+
@classmethod
160+
async def _get_alias_init_lock(cls) -> Lock:
161+
"""Get lock for controlling the creation of the alias storages.
162+
163+
The lock is shared for all instances of the AliasResolver class.
164+
It is created in async method to ensure that some event loop is already running.
165+
"""
166+
if cls._alias_init_lock is None:
167+
cls._alias_init_lock = Lock()
168+
return cls._alias_init_lock
169+
170+
@classmethod
171+
async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]:
172+
"""Get the aliases and storage ids mapping from the default kvs.
173+
174+
Mapping is loaded from kvs only once and is shared for all instances of the _AliasResolver class.
175+
176+
Args:
177+
configuration: Configuration object to use for accessing the default KVS.
178+
179+
Returns:
180+
Map of aliases and storage ids.
181+
"""
182+
if not cls._alias_map and Configuration.get_global_configuration().is_at_home:
183+
default_kvs_client = await cls._get_default_kvs_client(configuration)
184+
185+
record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY)
186+
187+
# get_record can return {key: ..., value: ..., content_type: ...}
188+
if isinstance(record, dict):
189+
if 'value' in record and isinstance(record['value'], dict):
190+
cls._alias_map = record['value']
191+
else:
192+
cls._alias_map = record
193+
else:
194+
cls._alias_map = dict[str, str]()
195+
196+
return cls._alias_map
197+
198+
async def resolve_id(self) -> str | None:
199+
"""Get id of the aliased storage.
200+
201+
Returns:
202+
Storage id if it exists, None otherwise.
203+
"""
204+
return (await self._get_alias_map(self._configuration)).get(self._storage_key, None)
205+
206+
async def store_mapping(self, storage_id: str) -> None:
207+
"""Add alias and related storage id to the mapping in default kvs and local in-memory mapping."""
208+
# Update in-memory mapping
209+
alias_map = await self._get_alias_map(self._configuration)
210+
alias_map[self._storage_key] = storage_id
211+
212+
if not Configuration.get_global_configuration().is_at_home:
213+
logging.getLogger(__name__).debug(
214+
'_AliasResolver storage limited retention is only supported on Apify platform. Storage is not exported.'
215+
)
216+
return
217+
218+
default_kvs_client = await self._get_default_kvs_client(self._configuration)
219+
await default_kvs_client.get()
220+
221+
try:
222+
record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY)
223+
224+
# get_record can return {key: ..., value: ..., content_type: ...}
225+
if isinstance(record, dict) and 'value' in record:
226+
record = record['value']
227+
228+
# Update or create the record with the new alias mapping
229+
if isinstance(record, dict):
230+
record[self._storage_key] = storage_id
231+
else:
232+
record = {self._storage_key: storage_id}
233+
234+
# Store the mapping back in the KVS.
235+
await default_kvs_client.set_record(self._ALIAS_MAPPING_KEY, record)
236+
except Exception as exc:
237+
logger.warning(f'Error storing alias mapping for {self._alias}: {exc}')
238+
239+
@property
240+
def _storage_key(self) -> str:
241+
"""Get a unique storage key used for storing the alias in the mapping."""
242+
return self._ALIAS_STORAGE_KEY_SEPARATOR.join(
243+
[
244+
self._storage_type,
245+
self._alias,
246+
self._additional_cache_key,
247+
]
248+
)
249+
250+
@staticmethod
251+
async def _get_default_kvs_client(configuration: Configuration) -> KeyValueStoreClientAsync:
252+
"""Get a client for the default key-value store."""
253+
apify_client_async = ApifyClientAsync(
254+
token=configuration.token,
255+
api_url=configuration.api_base_url,
256+
max_retries=8,
257+
min_delay_between_retries_millis=500,
258+
timeout_secs=360,
259+
)
260+
261+
if not configuration.default_key_value_store_id:
262+
raise ValueError("'Configuration.default_key_value_store_id' must be set.")
263+
264+
return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id)

0 commit comments

Comments
 (0)