Skip to content

Commit 38fb5f4

Browse files
committed
chore: Separate status check from synchronizer functionality
In the previous setup, we would only check the fallback or recovery conditions once the synchronizer returned an update. If the synchronizer was stuck, or nothing was changing in the environment, we would never check the conditions. This configuration also exposed an interesting behavior. If the synchronizer cannot connect, it will emit error updates. Each time we receive an error, we check if we have failed to initialize for the last 10 seconds. If so, we re-create the primary synchronizer. When it continues to fail, the first update will trigger the condition check. And since it has still failed for 10 seconds, it will immediately error out. With this change, we can be assured a synchronizer is given at least 10 seconds to try before the condition is evaluated.
1 parent b84c706 commit 38fb5f4

File tree

3 files changed

+46
-17
lines changed

3 files changed

+46
-17
lines changed

ldclient/impl/datasourcev2/status.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
class DataSourceStatusProviderImpl(DataSourceStatusProvider):
2020
def __init__(self, listeners: Listeners):
2121
self.__listeners = listeners
22-
self.__status = DataSourceStatus(DataSourceState.INITIALIZING, 0, None)
22+
self.__status = DataSourceStatus(DataSourceState.INITIALIZING, time.time(), None)
2323
self.__lock = ReadWriteLock()
2424

2525
@property

ldclient/impl/datasourcev2/streaming.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -405,13 +405,6 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
405405

406406
return (update, True)
407407

408-
# magic methods for "with" statement (used in testing)
409-
def __enter__(self):
410-
return self
411-
412-
def __exit__(self, type, value, traceback):
413-
self.stop()
414-
415408

416409
class StreamingDataSourceBuilder: # disable: pylint: disable=too-few-public-methods
417410
"""

ldclient/impl/datasystem/fdv2.py

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import time
33
from threading import Event, Thread
44
from typing import Any, Callable, Dict, List, Mapping, Optional
5+
from queue import Queue, Empty
56

67
from ldclient.config import Builder, Config, DataSystemConfig
78
from ldclient.feature_store import _FeatureStoreDataSetSorter
@@ -367,11 +368,12 @@ def synchronizer_loop(self: 'FDv2'):
367368
else:
368369
log.info("Fallback condition met")
369370

370-
if self._secondary_synchronizer_builder is None:
371-
continue
372371
if self._stop_event.is_set():
373372
break
374373

374+
if self._secondary_synchronizer_builder is None:
375+
continue
376+
375377
self._lock.lock()
376378
secondary_sync = self._secondary_synchronizer_builder(self._config)
377379
if isinstance(secondary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None:
@@ -433,8 +435,45 @@ def _consume_synchronizer_results(
433435
434436
:return: Tuple of (should_remove_sync, fallback_to_fdv1)
435437
"""
438+
action_queue = Queue()
439+
timer = RepeatingTask(
440+
label="FDv2-sync-cond-timer",
441+
interval=10,
442+
initial_delay=10,
443+
callable=lambda: action_queue.put("check")
444+
)
445+
446+
def reader(self: 'FDv2'):
447+
try:
448+
for update in synchronizer.sync(self._store):
449+
action_queue.put(update)
450+
finally:
451+
action_queue.put("quit")
452+
453+
sync_reader = Thread(
454+
target=reader,
455+
name="FDv2-sync-reader",
456+
args=(self,),
457+
daemon=True
458+
)
459+
436460
try:
437-
for update in synchronizer.sync(self._store):
461+
timer.start()
462+
sync_reader.start()
463+
464+
while True:
465+
update = action_queue.get(True)
466+
if isinstance(update, str):
467+
if update == "quit":
468+
break
469+
470+
if update == "check":
471+
# Check condition periodically
472+
current_status = self._data_source_status_provider.status
473+
if condition_func(current_status):
474+
return False, False
475+
continue
476+
438477
log.info("Synchronizer %s update: %s", synchronizer.name, update.state)
439478
if self._stop_event.is_set():
440479
return False, False
@@ -457,17 +496,14 @@ def _consume_synchronizer_results(
457496
# Check for OFF state indicating permanent failure
458497
if update.state == DataSourceState.OFF:
459498
return True, False
460-
461-
# Check condition periodically
462-
current_status = self._data_source_status_provider.status
463-
if condition_func(current_status):
464-
return False, False
465-
466499
except Exception as e:
467500
log.error("Error consuming synchronizer results: %s", e)
468501
return True, False
469502
finally:
470503
synchronizer.stop()
504+
timer.stop()
505+
506+
sync_reader.join(0.5)
471507

472508
return True, False
473509

0 commit comments

Comments
 (0)