Skip to content

Commit 85f59ba

Browse files
committed
chore: Flush in memory store on persistent store recovery
1 parent 2a5ce64 commit 85f59ba

File tree

7 files changed

+279
-14
lines changed

7 files changed

+279
-14
lines changed

ldclient/client.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -273,14 +273,13 @@ def __start_up(self, start_wait: float):
273273
self._data_system.data_source_status_provider
274274
)
275275
self.__flag_tracker = self._data_system.flag_tracker
276-
self._store: ReadOnlyStore = self._data_system.store
277276

278277
big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments)
279278
self.__big_segment_store_manager = big_segment_store_manager
280279

281280
self._evaluator = Evaluator(
282-
lambda key: _get_store_item(self._store, FEATURES, key),
283-
lambda key: _get_store_item(self._store, SEGMENTS, key),
281+
lambda key: _get_store_item(self._data_system.store, FEATURES, key),
282+
lambda key: _get_store_item(self._data_system.store, SEGMENTS, key),
284283
lambda key: big_segment_store_manager.get_user_membership(key),
285284
log,
286285
)
@@ -571,7 +570,7 @@ def _evaluate_internal(self, key: str, context: Context, default: Any, event_fac
571570
return EvaluationDetail(default, None, error_reason('CLIENT_NOT_READY')), None
572571

573572
if not self.is_initialized():
574-
if self._store.initialized:
573+
if self._data_system.store.initialized:
575574
log.warning("Feature Flag evaluation attempted before client has initialized - using last known values from feature store for feature key: " + key)
576575
else:
577576
log.warning("Feature Flag evaluation attempted before client has initialized! Feature store unavailable - returning default: " + str(default) + " for feature key: " + key)
@@ -584,7 +583,7 @@ def _evaluate_internal(self, key: str, context: Context, default: Any, event_fac
584583
return EvaluationDetail(default, None, error_reason('USER_NOT_SPECIFIED')), None
585584

586585
try:
587-
flag = _get_store_item(self._store, FEATURES, key)
586+
flag = _get_store_item(self._data_system.store, FEATURES, key)
588587
except Exception as e:
589588
log.error("Unexpected error while retrieving feature flag \"%s\": %s" % (key, repr(e)))
590589
log.debug(traceback.format_exc())
@@ -642,7 +641,7 @@ def all_flags_state(self, context: Context, **kwargs) -> FeatureFlagsState:
642641
return FeatureFlagsState(False)
643642

644643
if not self.is_initialized():
645-
if self._store.initialized:
644+
if self._data_system.store.initialized:
646645
log.warning("all_flags_state() called before client has finished initializing! Using last known values from feature store")
647646
else:
648647
log.warning("all_flags_state() called before client has finished initializing! Feature store unavailable - returning empty state")
@@ -657,7 +656,7 @@ def all_flags_state(self, context: Context, **kwargs) -> FeatureFlagsState:
657656
with_reasons = kwargs.get('with_reasons', False)
658657
details_only_if_tracked = kwargs.get('details_only_for_tracked_flags', False)
659658
try:
660-
flags_map = self._store.all(FEATURES, lambda x: x)
659+
flags_map = self._data_system.store.all(FEATURES, lambda x: x)
661660
if flags_map is None:
662661
raise ValueError("feature store error")
663662
except Exception as e:

ldclient/impl/datasourcev2/polling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
257257
if self._config.payload_filter_key is not None:
258258
query_params["filter"] = self._config.payload_filter_key
259259

260-
if selector is not None:
260+
if selector is not None and selector.is_defined():
261261
query_params["selector"] = selector.state
262262

263263
uri = self._poll_uri

ldclient/impl/datasystem/fdv2.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def __update_availability(self, available: bool):
8989
if available:
9090
log.warning("Persistent store is available again")
9191

92-
status = DataStoreStatus(available, False)
92+
status = DataStoreStatus(available, True)
9393
self.__store_update_sink.update_status(status)
9494

9595
if available:
@@ -185,16 +185,18 @@ def __init__(
185185
self._change_set_listeners = Listeners()
186186
self._data_store_listeners = Listeners()
187187

188+
self._data_store_listeners.add(self._persistent_store_outage_recovery)
189+
188190
# Create the store
189191
self._store = Store(self._flag_change_listeners, self._change_set_listeners)
190192

191193
# Status providers
192194
self._data_source_status_provider = DataSourceStatusProviderImpl(Listeners())
193-
self._data_store_status_provider = DataStoreStatusProviderImpl(None, Listeners())
195+
self._data_store_status_provider = DataStoreStatusProviderImpl(None, self._data_store_listeners)
194196

195197
# Configure persistent store if provided
196198
if self._data_system_config.data_store is not None:
197-
self._data_store_status_provider = DataStoreStatusProviderImpl(self._data_system_config.data_store, Listeners())
199+
self._data_store_status_provider = DataStoreStatusProviderImpl(self._data_system_config.data_store, self._data_store_listeners)
198200
writable = self._data_system_config.data_store_mode == DataStoreMode.READ_WRITE
199201
wrapper = FeatureStoreClientWrapper(self._data_system_config.data_store, self._data_store_status_provider)
200202
self._store.with_persistence(
@@ -509,6 +511,21 @@ def _recovery_condition(self, status: DataSourceStatus) -> bool:
509511

510512
return interrupted_at_runtime or healthy_for_too_long or cannot_initialize
511513

514+
def _persistent_store_outage_recovery(self, data_store_status: DataStoreStatus):
515+
"""
516+
Monitor the data store status. If the store comes online and
517+
potentially has stale data, we should write our known state to it.
518+
"""
519+
if not data_store_status.available:
520+
return
521+
522+
if not data_store_status.stale:
523+
return
524+
525+
err = self._store.commit()
526+
if err is not None:
527+
log.error("Failed to reinitialize data store", exc_info=err)
528+
512529
@property
513530
def store(self) -> ReadOnlyStore:
514531
"""Get the underlying store for flag evaluation."""

ldclient/impl/datasystem/protocolv2.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ def name(self) -> str:
505505
"""Returns the name of the initializer."""
506506
raise NotImplementedError
507507

508-
def sync(self, ss: "SelectorStore") -> "Generator[Update, None, None]":
508+
def sync(self, ss: "SelectorStore") -> Generator["Update", None, None]:
509509
"""
510510
sync should begin the synchronization process for the data source, yielding
511511
Update objects until the connection is closed or an unrecoverable error

ldclient/impl/datasystem/store.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from ldclient.impl.dependency_tracker import DependencyTracker, KindAndKey
2222
from ldclient.impl.listeners import Listeners
23+
from ldclient.impl.model.entity import ModelEntity
2324
from ldclient.impl.rwlock import ReadWriteLock
2425
from ldclient.impl.util import log
2526
from ldclient.interfaces import (
@@ -451,13 +452,19 @@ def commit(self) -> Optional[Exception]:
451452
Returns:
452453
Exception if commit failed, None otherwise
453454
"""
455+
def __mapping_from_kind(kind: VersionedDataKind) -> Callable[[Dict[str, ModelEntity]], Dict[str, Dict[str, Any]]]:
456+
def __mapping(data: Dict[str, ModelEntity]) -> Dict[str, Dict[str, Any]]:
457+
return {k: kind.encode(v) for k, v in data.items()}
458+
459+
return __mapping
460+
454461
with self._lock:
455462
if self._should_persist():
456463
try:
457464
# Get all data from memory store and write to persistent store
458465
all_data = {}
459466
for kind in [FEATURES, SEGMENTS]:
460-
all_data[kind] = self._memory_store.all(kind, lambda x: x)
467+
all_data[kind] = self._memory_store.all(kind, __mapping_from_kind(kind))
461468
self._persistent_store.init(all_data) # type: ignore
462469
except Exception as e:
463470
return e

ldclient/testing/impl/datasystem/test_fdv2_persistence.py

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,3 +537,245 @@ def test_no_persistent_store_status_provider_without_store():
537537
assert set_on_ready.wait(1), "Data system did not become ready in time"
538538

539539
fdv2.stop()
540+
541+
542+
def test_persistent_store_outage_recovery_flushes_on_recovery():
543+
"""Test that in-memory store is flushed to persistent store when it recovers from outage"""
544+
from ldclient.interfaces import DataStoreStatus
545+
546+
persistent_store = StubFeatureStore()
547+
548+
# Create synchronizer with initial data
549+
td_synchronizer = TestDataV2.data_source()
550+
td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True))
551+
552+
data_system_config = DataSystemConfig(
553+
data_store_mode=DataStoreMode.READ_WRITE,
554+
data_store=persistent_store,
555+
initializers=None,
556+
primary_synchronizer=td_synchronizer.build_synchronizer,
557+
)
558+
559+
set_on_ready = Event()
560+
fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config)
561+
fdv2.start(set_on_ready)
562+
563+
assert set_on_ready.wait(1), "Data system did not become ready in time"
564+
565+
# Verify initial data is in the persistent store
566+
snapshot = persistent_store.get_data_snapshot()
567+
assert "feature-flag" in snapshot[FEATURES]
568+
assert snapshot[FEATURES]["feature-flag"]["on"] is True
569+
570+
# Reset tracking to isolate recovery behavior
571+
persistent_store.reset_operation_tracking()
572+
573+
event = Event()
574+
fdv2.flag_tracker.add_listener(lambda _flag_change: event.set())
575+
# Simulate a new flag being added while store is "offline"
576+
# (In reality, the store is still online, but we're testing the recovery mechanism)
577+
td_synchronizer.update(td_synchronizer.flag("new-flag").on(False))
578+
579+
# Block until the flag has propagated through the data store
580+
assert event.wait(1)
581+
582+
# Now simulate the persistent store coming back online with stale data
583+
# by triggering the recovery callback directly
584+
fdv2._persistent_store_outage_recovery(DataStoreStatus(available=True, stale=True))
585+
586+
# Verify that init was called on the persistent store (flushing in-memory data)
587+
assert persistent_store.init_called_count > 0, "Store should have been reinitialized"
588+
589+
# Verify both flags are now in the persistent store
590+
snapshot = persistent_store.get_data_snapshot()
591+
assert "feature-flag" in snapshot[FEATURES]
592+
assert "new-flag" in snapshot[FEATURES]
593+
594+
fdv2.stop()
595+
596+
597+
def test_persistent_store_outage_recovery_no_flush_when_not_stale():
598+
"""Test that recovery does NOT flush when store comes back online without stale data"""
599+
from ldclient.interfaces import DataStoreStatus
600+
601+
persistent_store = StubFeatureStore()
602+
603+
td_synchronizer = TestDataV2.data_source()
604+
td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True))
605+
606+
data_system_config = DataSystemConfig(
607+
data_store_mode=DataStoreMode.READ_WRITE,
608+
data_store=persistent_store,
609+
initializers=None,
610+
primary_synchronizer=td_synchronizer.build_synchronizer,
611+
)
612+
613+
set_on_ready = Event()
614+
fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config)
615+
fdv2.start(set_on_ready)
616+
617+
assert set_on_ready.wait(1), "Data system did not become ready in time"
618+
619+
# Reset tracking
620+
persistent_store.reset_operation_tracking()
621+
622+
# Simulate store coming back online but NOT stale (data is fresh)
623+
fdv2._persistent_store_outage_recovery(DataStoreStatus(available=True, stale=False))
624+
625+
# Verify that init was NOT called (no flush needed)
626+
assert persistent_store.init_called_count == 0, "Store should not be reinitialized when not stale"
627+
628+
fdv2.stop()
629+
630+
631+
def test_persistent_store_outage_recovery_no_flush_when_unavailable():
632+
"""Test that recovery does NOT flush when store is unavailable"""
633+
from ldclient.interfaces import DataStoreStatus
634+
635+
persistent_store = StubFeatureStore()
636+
637+
td_synchronizer = TestDataV2.data_source()
638+
td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True))
639+
640+
data_system_config = DataSystemConfig(
641+
data_store_mode=DataStoreMode.READ_WRITE,
642+
data_store=persistent_store,
643+
initializers=None,
644+
primary_synchronizer=td_synchronizer.build_synchronizer,
645+
)
646+
647+
set_on_ready = Event()
648+
fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config)
649+
fdv2.start(set_on_ready)
650+
651+
assert set_on_ready.wait(1), "Data system did not become ready in time"
652+
653+
# Reset tracking
654+
persistent_store.reset_operation_tracking()
655+
656+
# Simulate store being unavailable (even if marked as stale)
657+
fdv2._persistent_store_outage_recovery(DataStoreStatus(available=False, stale=True))
658+
659+
# Verify that init was NOT called (store is not available)
660+
assert persistent_store.init_called_count == 0, "Store should not be reinitialized when unavailable"
661+
662+
fdv2.stop()
663+
664+
665+
def test_persistent_store_commit_encodes_data_correctly():
666+
"""Test that Store.commit() properly encodes data before writing to persistent store"""
667+
from ldclient.impl.datasystem.protocolv2 import (
668+
Change,
669+
ChangeSet,
670+
ChangeType,
671+
IntentCode,
672+
ObjectKind
673+
)
674+
from ldclient.impl.datasystem.store import Store
675+
from ldclient.impl.listeners import Listeners
676+
677+
persistent_store = StubFeatureStore()
678+
store = Store(Listeners(), Listeners())
679+
store.with_persistence(persistent_store, True, None)
680+
681+
# Create a flag with raw data
682+
flag_data = {
683+
"key": "test-flag",
684+
"version": 1,
685+
"on": True,
686+
"variations": [True, False],
687+
"fallthrough": {"variation": 0},
688+
}
689+
690+
# Apply a changeset to add the flag to the in-memory store
691+
changeset = ChangeSet(
692+
intent_code=IntentCode.TRANSFER_FULL,
693+
changes=[
694+
Change(
695+
action=ChangeType.PUT,
696+
kind=ObjectKind.FLAG,
697+
key="test-flag",
698+
version=1,
699+
object=flag_data,
700+
)
701+
],
702+
selector=None,
703+
)
704+
store.apply(changeset, True)
705+
706+
# Reset tracking
707+
persistent_store.reset_operation_tracking()
708+
709+
# Now commit the in-memory store to the persistent store
710+
err = store.commit()
711+
assert err is None, "Commit should succeed"
712+
713+
# Verify that init was called with properly encoded data
714+
assert persistent_store.init_called_count == 1, "Init should be called once"
715+
716+
# Verify the data in the persistent store is properly encoded
717+
snapshot = persistent_store.get_data_snapshot()
718+
assert "test-flag" in snapshot[FEATURES]
719+
720+
# The data should be in the encoded format (as a dict with all required fields)
721+
flag_in_store = snapshot[FEATURES]["test-flag"]
722+
assert flag_in_store["key"] == "test-flag"
723+
assert flag_in_store["version"] == 1
724+
assert flag_in_store["on"] is True
725+
726+
727+
def test_persistent_store_commit_with_no_persistent_store():
728+
"""Test that Store.commit() safely handles the case where there's no persistent store"""
729+
from ldclient.impl.datasystem.store import Store
730+
from ldclient.impl.listeners import Listeners
731+
732+
# Create store without persistent store
733+
store = Store(Listeners(), Listeners())
734+
735+
# Commit should succeed but do nothing
736+
err = store.commit()
737+
assert err is None, "Commit should succeed even without persistent store"
738+
739+
740+
def test_persistent_store_commit_handles_errors():
741+
"""Test that Store.commit() handles errors from persistent store gracefully"""
742+
from ldclient.impl.datasystem.protocolv2 import (
743+
Change,
744+
ChangeSet,
745+
ChangeType,
746+
IntentCode,
747+
ObjectKind
748+
)
749+
from ldclient.impl.datasystem.store import Store
750+
from ldclient.impl.listeners import Listeners
751+
752+
class FailingFeatureStore(StubFeatureStore):
753+
"""A feature store that always fails on init"""
754+
def init(self, all_data):
755+
raise RuntimeError("Simulated persistent store failure")
756+
757+
persistent_store = FailingFeatureStore()
758+
store = Store(Listeners(), Listeners())
759+
store.with_persistence(persistent_store, True, None)
760+
761+
# Add some data to the in-memory store
762+
changeset = ChangeSet(
763+
intent_code=IntentCode.TRANSFER_FULL,
764+
changes=[
765+
Change(
766+
action=ChangeType.PUT,
767+
kind=ObjectKind.FLAG,
768+
key="test-flag",
769+
version=1,
770+
object={"key": "test-flag", "version": 1, "on": True},
771+
)
772+
],
773+
selector=None,
774+
)
775+
store.apply(changeset, True)
776+
777+
# Commit should return the error without raising
778+
err = store.commit()
779+
assert err is not None, "Commit should return error from persistent store"
780+
assert isinstance(err, RuntimeError)
781+
assert str(err) == "Simulated persistent store failure"

0 commit comments

Comments
 (0)