-
Notifications
You must be signed in to change notification settings - Fork 45
chore: Separate status check from synchronizer functionality #373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
38fb5f4
3d7db5c
82bb228
2ca4af0
1c61bfc
fa34244
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| import logging | ||
| import time | ||
| from queue import Empty, Queue | ||
| from threading import Event, Thread | ||
| from typing import Any, Callable, Dict, List, Mapping, Optional | ||
|
|
||
|
|
@@ -367,11 +368,12 @@ def synchronizer_loop(self: 'FDv2'): | |
| else: | ||
| log.info("Fallback condition met") | ||
|
|
||
| if self._secondary_synchronizer_builder is None: | ||
| continue | ||
| if self._stop_event.is_set(): | ||
| break | ||
|
|
||
| if self._secondary_synchronizer_builder is None: | ||
| continue | ||
|
|
||
| self._lock.lock() | ||
| secondary_sync = self._secondary_synchronizer_builder(self._config) | ||
| if isinstance(secondary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None: | ||
|
|
@@ -433,8 +435,45 @@ def _consume_synchronizer_results( | |
| :return: Tuple of (should_remove_sync, fallback_to_fdv1) | ||
| """ | ||
| action_queue: Queue = Queue() | ||
| timer = RepeatingTask( | ||
| label="FDv2-sync-cond-timer", | ||
| interval=10, | ||
| initial_delay=10, | ||
| callable=lambda: action_queue.put("check") | ||
| ) | ||
|
|
||
| def reader(self: 'FDv2'): | ||
| try: | ||
| for update in synchronizer.sync(self._store): | ||
| action_queue.put(update) | ||
| finally: | ||
| action_queue.put("quit") | ||
|
|
||
| sync_reader = Thread( | ||
| target=reader, | ||
| name="FDv2-sync-reader", | ||
| args=(self,), | ||
| daemon=True | ||
| ) | ||
|
|
||
| try: | ||
| for update in synchronizer.sync(self._store): | ||
| timer.start() | ||
| sync_reader.start() | ||
|
|
||
| while True: | ||
| update = action_queue.get(True) | ||
| if isinstance(update, str): | ||
| if update == "quit": | ||
| break | ||
|
|
||
| if update == "check": | ||
| # Check condition periodically | ||
| current_status = self._data_source_status_provider.status | ||
| if condition_func(current_status): | ||
| return False, False | ||
| continue | ||
|
|
||
| log.info("Synchronizer %s update: %s", synchronizer.name, update.state) | ||
| if self._stop_event.is_set(): | ||
| return False, False | ||
|
|
@@ -457,17 +496,14 @@ def _consume_synchronizer_results( | |
| # Check for OFF state indicating permanent failure | ||
| if update.state == DataSourceState.OFF: | ||
| return True, False | ||
|
|
||
| # Check condition periodically | ||
| current_status = self._data_source_status_provider.status | ||
| if condition_func(current_status): | ||
| return False, False | ||
|
|
||
| except Exception as e: | ||
| log.error("Error consuming synchronizer results: %s", e) | ||
| return True, False | ||
| finally: | ||
| synchronizer.stop() | ||
| timer.stop() | ||
|
|
||
| sync_reader.join(0.5) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of the half second timeout here? My understanding is that if it doesn't stop in .5 seconds, execution will move on and that thread will still continue to process.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a blocker, just curious.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are trying to shut down in an orderly fashion. The timeout gives us time to close out correctly, but eventually we do have to move on if it isn't halting as expected. The idea is that if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Thread Termination Failure Leads to InstabilityThe |
||
|
|
||
| return True, False | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.