diff --git a/.env.example b/.env.example index 63623595..10bb9890 100644 --- a/.env.example +++ b/.env.example @@ -6,6 +6,8 @@ APP_RELOAD=True APP_DEBUG=True JBI_API_KEY="fake_api_key" DL_QUEUE_DSN=file:///tmp +DL_QUEUE_CONSTANT_RETRY=true +DL_QUEUE_RETRY_TIMEOUT_DAYS=7 # Jira API Secrets JIRA_USERNAME="fake_jira_username" diff --git a/.secrets.baseline b/.secrets.baseline index 37298362..aea8c706 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -113,16 +113,16 @@ "filename": ".env.example", "hashed_secret": "4b9a4ce92b6a01a4cd6ee1672d31c043f2ae79ab", "is_verified": false, - "line_number": 12 + "line_number": 14 }, { "type": "Secret Keyword", "filename": ".env.example", "hashed_secret": "77ea6398f252999314d609a708842a49fc43e055", "is_verified": false, - "line_number": 15 + "line_number": 17 } ] }, - "generated_at": "2024-03-20T18:05:17Z" + "generated_at": "2024-04-10T20:05:01Z" } diff --git a/Makefile b/Makefile index cdb82bb6..9dec25a1 100644 --- a/Makefile +++ b/Makefile @@ -65,11 +65,11 @@ $(DOTENV_FILE): .PHONY: docker-shell docker-shell: $(DOTENV_FILE) - docker-compose run --rm web /bin/sh + docker compose run --rm web /bin/sh .PHONY: docker-start docker-start: $(DOTENV_FILE) - docker-compose up + docker compose up .PHONY: test test: $(INSTALL_STAMP) diff --git a/docker-compose.yaml b/docker-compose.yaml index a62d8054..5c10777a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -12,3 +12,10 @@ services: - ${PORT:-8000}:${PORT:-8000} volumes: - .:/app + retry: + build: . + command: python -m jbi.retry + env_file: + - .env + volumes: + - .:/app diff --git a/jbi/environment.py b/jbi/environment.py index fbf5359b..3fe44e3c 100644 --- a/jbi/environment.py +++ b/jbi/environment.py @@ -48,9 +48,12 @@ class Settings(BaseSettings): sentry_dsn: Optional[AnyUrl] = None sentry_traces_sample_rate: float = 1.0 + # Retry queue dl_queue_dsn: FileUrl - model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") + model_config = SettingsConfigDict( + env_file=".env", env_file_encoding="utf-8", extra="ignore" + ) @lru_cache(maxsize=1) diff --git a/jbi/queue.py b/jbi/queue.py index 70fd08fe..67425246 100644 --- a/jbi/queue.py +++ b/jbi/queue.py @@ -302,6 +302,9 @@ async def list(self, bug_id: int) -> List[str]: async def list_all(self) -> dict[int, List[str]]: return await self.backend.list_all() + async def size(self, bug_id=None): + return await self.backend.size(bug_id=bug_id) + async def done(self, item: QueueItem) -> None: """ Mark item as done, remove from queue. diff --git a/jbi/retry.py b/jbi/retry.py new file mode 100644 index 00000000..77d95286 --- /dev/null +++ b/jbi/retry.py @@ -0,0 +1,79 @@ +import asyncio +import logging +from datetime import UTC, datetime, timedelta +from os import getenv +from time import sleep + +import jbi.runner as runner +from jbi.configuration import ACTIONS +from jbi.queue import get_dl_queue + +CONSTANT_RETRY = getenv("DL_QUEUE_CONSTANT_RETRY", "false") == "true" +RETRY_TIMEOUT_DAYS = getenv("DL_QUEUE_RETRY_TIMEOUT_DAYS", 7) +CONSTANT_RETRY_SLEEP = getenv("DL_QUEUE_CONSTANT_RETRY_SLEEP", 5) + +logger = logging.getLogger(__name__) + + +async def retry_failed(item_executor=runner.execute_action, queue=get_dl_queue()): + min_event_timestamp = datetime.now(UTC) - timedelta(days=int(RETRY_TIMEOUT_DAYS)) + + # load all bugs from DLQ + bugs = await queue.retrieve() + + # metrics to track + metrics = { + "bug_count": len(bugs), + "events_processed": 0, + "events_skipped": 0, + "events_failed": 0, + "bugs_failed": 0, + } + + for bug_id, items in bugs.items(): + try: + async for item in items: + # skip and delete item if we have exceeded RETRY_TIMEOUT_DAYS + if item.timestamp < min_event_timestamp: + logger.warning("removing expired event %s", item.identifier) + await queue.done(item) + metrics["events_skipped"] += 1 + continue + + try: + item_executor(item.payload, ACTIONS) + await queue.done(item) + metrics["events_processed"] += 1 + except Exception: + logger.exception("failed to reprocess event %s.", item.identifier) + metrics["events_failed"] += 1 + + # check for other events that will be skipped + pending_events = await queue.size(bug_id) + if pending_events > 1: # if this isn't the only event for the bug + logger.info( + "skipping %d event(s) for bug %d, previous event %s failed", + pending_events - 1, + bug_id, + item.identifier, + ) + metrics["events_skipped"] += pending_events - 1 + break + except Exception: + logger.exception("failed to parse events for bug %d.", bug_id) + metrics["bugs_failed"] += 1 + + return metrics + + +async def main(): + while True: + metrics = await retry_failed() + logger.info("event queue processing complete", extra=metrics) + if not CONSTANT_RETRY: + return + sleep(int(CONSTANT_RETRY_SLEEP)) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py new file mode 100644 index 00000000..0c4c94d6 --- /dev/null +++ b/tests/unit/test_retry.py @@ -0,0 +1,157 @@ +from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock + +import pytest + +from jbi.queue import DeadLetterQueue +from jbi.retry import RETRY_TIMEOUT_DAYS, retry_failed +from jbi.runner import execute_action + + +def iter_error(): + mock = MagicMock() + mock.__aiter__.return_value = None + mock.__aiter__.side_effect = Exception("Throwing an exception") + return mock + + +async def aiter_sync(iterable): + for i in iterable: + yield i + + +@pytest.fixture +def mock_queue(): + return MagicMock(spec=DeadLetterQueue) + + +@pytest.fixture +def mock_executor(): + return MagicMock(spec=execute_action) + + +@pytest.mark.asyncio +async def test_retry_empty_list(caplog, mock_queue): + mock_queue.retrieve.return_value = {} + + metrics = await retry_failed(queue=mock_queue) + mock_queue.retrieve.assert_called_once() + assert len(caplog.messages) == 0 + assert metrics == { + "bug_count": 0, + "events_processed": 0, + "events_skipped": 0, + "events_failed": 0, + "bugs_failed": 0, + } + + +@pytest.mark.asyncio +async def test_retry_success(caplog, mock_queue, mock_executor, queue_item_factory): + mock_queue.retrieve.return_value = { + 1: aiter_sync([queue_item_factory(payload__bug__id=1)]) + } + + metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue) + assert len(caplog.messages) == 0 # no logs should have been generated + mock_queue.retrieve.assert_called_once() + mock_queue.done.assert_called_once() # item should be marked as complete + mock_executor.assert_called_once() # item should have been processed + assert metrics == { + "bug_count": 1, + "events_processed": 1, + "events_skipped": 0, + "events_failed": 0, + "bugs_failed": 0, + } + + +@pytest.mark.asyncio +async def test_retry_fail_and_skip( + caplog, mock_queue, mock_executor, queue_item_factory +): + mock_queue.retrieve.return_value = { + 1: aiter_sync( + [ + queue_item_factory(payload__bug__id=1), + queue_item_factory(payload__bug__id=1), + ] + ) + } + + mock_executor.side_effect = Exception("Throwing an exception") + mock_queue.size.return_value = 3 + + metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue) + mock_queue.retrieve.assert_called_once() + mock_queue.done.assert_not_called() # no items should have been marked as done + assert caplog.text.count("failed to reprocess event") == 1 + assert caplog.text.count("skipping 2 event(s)") == 1 + assert caplog.text.count("removing expired event") == 0 + mock_executor.assert_called_once() # only one item should have been attempted to be processed + assert metrics == { + "bug_count": 1, + "events_processed": 0, + "events_skipped": 2, + "events_failed": 1, + "bugs_failed": 0, + } + + +@pytest.mark.asyncio +async def test_retry_remove_expired( + caplog, mock_queue, mock_executor, queue_item_factory +): + mock_queue.retrieve.return_value = { + 1: aiter_sync( + [ + queue_item_factory( + payload__bug__id=1, + payload__event__time=datetime.now(UTC) + - timedelta(days=int(RETRY_TIMEOUT_DAYS), seconds=1), + ), + queue_item_factory(payload__bug__id=1), + ] + ) + } + + metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue) + mock_queue.retrieve.assert_called_once() + assert ( + len(mock_queue.done.call_args_list) == 2 + ), "both items should have been marked as done" + assert caplog.text.count("failed to reprocess event") == 0 + assert caplog.text.count("skipping events") == 0 + assert caplog.text.count("removing expired event") == 1 + mock_executor.assert_called_once() # only one item should have been attempted to be processed + assert metrics == { + "bug_count": 1, + "events_processed": 1, + "events_skipped": 1, + "events_failed": 0, + "bugs_failed": 0, + } + + +@pytest.mark.asyncio +async def test_retry_bug_failed(caplog, mock_queue, mock_executor, queue_item_factory): + mock_queue.retrieve.return_value = { + 1: aiter_sync([queue_item_factory(payload__bug__id=1)]), + 2: iter_error(), + } + + metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue) + mock_queue.retrieve.assert_called_once() + mock_queue.done.assert_called_once() # one item should have been marked as done + assert caplog.text.count("failed to reprocess event") == 0 + assert caplog.text.count("skipping events") == 0 + assert caplog.text.count("removing expired event") == 0 + assert caplog.text.count("failed to parse events for bug") == 1 + mock_executor.assert_called_once() # only one item should have been attempted to be processed + assert metrics == { + "bug_count": 2, + "events_processed": 1, + "events_skipped": 0, + "events_failed": 0, + "bugs_failed": 1, + }