Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
91cfb29
Add pytest-asyncio
grahamalama Mar 20, 2024
d330898
Add dead letter queue
grahamalama Mar 20, 2024
b0c8d48
Update secrets.baseline
grahamalama Mar 20, 2024
a7dd23d
Add heartbeat check for queue availability
grahamalama Mar 20, 2024
02f5d03
Fix invalid_dl_queue_dsn_raises
grahamalama Mar 21, 2024
df1bd04
Ensure module and methods are properly documented
grahamalama Mar 21, 2024
fa05508
Log size of queue after insertion at debug level
grahamalama Mar 21, 2024
4350974
Add test for failing file backend ping
grahamalama Mar 21, 2024
40c2040
Add some logging to the queue and backends
grahamalama Mar 21, 2024
9bd9c96
Fix logging for bugs with no entries in memory get_all
grahamalama Mar 22, 2024
6a01d5a
Fix typo in `get_all` docstring
grahamalama Mar 22, 2024
4b89323
Add debug messages for writing bug to file queue
grahamalama Mar 22, 2024
f1eb44c
Remove memory backend
grahamalama Mar 25, 2024
75048e5
Preserve queue directory in clear()
grahamalama Mar 26, 2024
f6b8bd9
Refactor size
grahamalama Mar 26, 2024
4467db8
Use size for `is_blocked`
grahamalama Mar 26, 2024
635f42f
Refactor get(), get_all(), retrieve()
grahamalama Mar 26, 2024
6d14b9c
Add some missing typing
grahamalama Mar 26, 2024
b2bba8e
Merge remote-tracking branch 'origin/main' into dlq-class
grahamalama Mar 26, 2024
d39c7b6
payload.event.time isn't a callable
grahamalama Mar 26, 2024
6236719
Remote retries property from QueueItemFactory
grahamalama Mar 26, 2024
576f9fe
Adding retry process
alexcottner Mar 26, 2024
7312dde
ran lint
alexcottner Mar 26, 2024
158cecf
removed unused var
alexcottner Mar 26, 2024
11fc655
adding some metrics and error handling for async iteration errors
alexcottner Mar 26, 2024
2f814d4
a little cleanup
alexcottner Mar 26, 2024
d6610f3
Add tests for errors for invalid json and a webhook payload that does…
grahamalama Mar 27, 2024
c62e86c
Make a queue item timestamp an alias of the event timestamp
grahamalama Mar 27, 2024
63057fc
Add methods for listing items in the queue
grahamalama Mar 27, 2024
43a7a15
Catch and reraise custom exception for failing to read item into memory
grahamalama Mar 27, 2024
96222d1
Colocate custom exceptions
grahamalama Mar 27, 2024
9bac2d5
Merge branch 'dlq-class' of github.com:mozilla/jira-bugzilla-integrat…
alexcottner Mar 27, 2024
33397cd
Cleaning up a lot of loose ends. Changes for PR feedback.
alexcottner Mar 27, 2024
99111e9
Fixed env var names
alexcottner Mar 27, 2024
1ee3ab7
Add methods to access list and list_all from queue class
grahamalama Mar 28, 2024
70a4909
fix comment in jbi/retry
alexcottner Mar 28, 2024
8843d34
Merge branch 'dlq-class' of github.com:mozilla/jira-bugzilla-integrat…
alexcottner Mar 28, 2024
5b76cc8
Some light refactoring and cleanup, responding to all PR feedback so …
alexcottner Mar 28, 2024
536f6b6
merging main
alexcottner Apr 10, 2024
4a365bc
updating .secrets.baseline again
alexcottner Apr 10, 2024
23500d1
fixing one test to use the fixture
alexcottner Apr 11, 2024
e5f8e32
fixing lint
alexcottner Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ APP_RELOAD=True
APP_DEBUG=True
JBI_API_KEY="fake_api_key"
DL_QUEUE_DSN=file:///tmp
DL_QUEUE_CONSTANT_RETRY=true
DL_QUEUE_RETRY_TIMEOUT_DAYS=7

# Jira API Secrets
JIRA_USERNAME="fake_jira_username"
Expand Down
6 changes: 3 additions & 3 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,16 @@
"filename": ".env.example",
"hashed_secret": "4b9a4ce92b6a01a4cd6ee1672d31c043f2ae79ab",
"is_verified": false,
"line_number": 12
"line_number": 14
},
{
"type": "Secret Keyword",
"filename": ".env.example",
"hashed_secret": "77ea6398f252999314d609a708842a49fc43e055",
"is_verified": false,
"line_number": 15
"line_number": 17
}
]
},
"generated_at": "2024-03-20T18:05:17Z"
"generated_at": "2024-04-10T20:05:01Z"
}
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ $(DOTENV_FILE):

.PHONY: docker-shell
docker-shell: $(DOTENV_FILE)
docker-compose run --rm web /bin/sh
docker compose run --rm web /bin/sh

.PHONY: docker-start
docker-start: $(DOTENV_FILE)
docker-compose up
docker compose up

.PHONY: test
test: $(INSTALL_STAMP)
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ services:
- ${PORT:-8000}:${PORT:-8000}
volumes:
- .:/app
retry:
build: .
command: python -m jbi.retry
env_file:
- .env
volumes:
- .:/app
5 changes: 4 additions & 1 deletion jbi/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ class Settings(BaseSettings):
sentry_dsn: Optional[AnyUrl] = None
sentry_traces_sample_rate: float = 1.0

# Retry queue
dl_queue_dsn: FileUrl

model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
model_config = SettingsConfigDict(
env_file=".env", env_file_encoding="utf-8", extra="ignore"
)


@lru_cache(maxsize=1)
Expand Down
3 changes: 3 additions & 0 deletions jbi/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ async def list(self, bug_id: int) -> List[str]:
async def list_all(self) -> dict[int, List[str]]:
return await self.backend.list_all()

async def size(self, bug_id=None):
return await self.backend.size(bug_id=bug_id)

async def done(self, item: QueueItem) -> None:
"""
Mark item as done, remove from queue.
Expand Down
79 changes: 79 additions & 0 deletions jbi/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import asyncio
import logging
from datetime import UTC, datetime, timedelta
from os import getenv
from time import sleep

import jbi.runner as runner
from jbi.configuration import ACTIONS
from jbi.queue import get_dl_queue

CONSTANT_RETRY = getenv("DL_QUEUE_CONSTANT_RETRY", "false") == "true"
RETRY_TIMEOUT_DAYS = getenv("DL_QUEUE_RETRY_TIMEOUT_DAYS", 7)
CONSTANT_RETRY_SLEEP = getenv("DL_QUEUE_CONSTANT_RETRY_SLEEP", 5)

logger = logging.getLogger(__name__)


async def retry_failed(item_executor=runner.execute_action, queue=get_dl_queue()):
min_event_timestamp = datetime.now(UTC) - timedelta(days=int(RETRY_TIMEOUT_DAYS))

# load all bugs from DLQ
bugs = await queue.retrieve()

# metrics to track
metrics = {
"bug_count": len(bugs),
"events_processed": 0,
"events_skipped": 0,
"events_failed": 0,
"bugs_failed": 0,
}

for bug_id, items in bugs.items():
try:
async for item in items:
# skip and delete item if we have exceeded RETRY_TIMEOUT_DAYS
if item.timestamp < min_event_timestamp:
logger.warning("removing expired event %s", item.identifier)
await queue.done(item)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need another method for this? discard() or something with better semantics than done()? Especially if in the queue we log stuff like item X is done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue should be just an IO operator and not have any real logic in it. It feels a little weird to have two different functions that do the same functional operation but with potentially different debug logs. Am I thinking about it wrong?

Looking at the logs the queue emits, we don't currently have a conflict problem. The debug level logs say things like "Removed {event} from queue for bug {bug_id}." and "Removed directory for bug {bug_id}". And our log levels will be above debug in prod and we won't alert on those.

metrics["events_skipped"] += 1
continue

try:
item_executor(item.payload, ACTIONS)
await queue.done(item)
metrics["events_processed"] += 1
except Exception:
logger.exception("failed to reprocess event %s.", item.identifier)
metrics["events_failed"] += 1

# check for other events that will be skipped
pending_events = await queue.size(bug_id)
if pending_events > 1: # if this isn't the only event for the bug
logger.info(
"skipping %d event(s) for bug %d, previous event %s failed",
pending_events - 1,
bug_id,
item.identifier,
)
metrics["events_skipped"] += pending_events - 1
break
except Exception:
logger.exception("failed to parse events for bug %d.", bug_id)
metrics["bugs_failed"] += 1

return metrics


async def main():
while True:
metrics = await retry_failed()
logger.info("event queue processing complete", extra=metrics)
if not CONSTANT_RETRY:
return
sleep(int(CONSTANT_RETRY_SLEEP))


if __name__ == "__main__":
asyncio.run(main())
157 changes: 157 additions & 0 deletions tests/unit/test_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from datetime import UTC, datetime, timedelta
from unittest.mock import MagicMock

import pytest

from jbi.queue import DeadLetterQueue
from jbi.retry import RETRY_TIMEOUT_DAYS, retry_failed
from jbi.runner import execute_action


def iter_error():
mock = MagicMock()
mock.__aiter__.return_value = None
mock.__aiter__.side_effect = Exception("Throwing an exception")
return mock


async def aiter_sync(iterable):
for i in iterable:
yield i


@pytest.fixture
def mock_queue():
return MagicMock(spec=DeadLetterQueue)


@pytest.fixture
def mock_executor():
return MagicMock(spec=execute_action)


@pytest.mark.asyncio
async def test_retry_empty_list(caplog, mock_queue):
mock_queue.retrieve.return_value = {}

metrics = await retry_failed(queue=mock_queue)
mock_queue.retrieve.assert_called_once()
assert len(caplog.messages) == 0
assert metrics == {
"bug_count": 0,
"events_processed": 0,
"events_skipped": 0,
"events_failed": 0,
"bugs_failed": 0,
}


@pytest.mark.asyncio
async def test_retry_success(caplog, mock_queue, mock_executor, queue_item_factory):
mock_queue.retrieve.return_value = {
1: aiter_sync([queue_item_factory(payload__bug__id=1)])
}

metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue)
assert len(caplog.messages) == 0 # no logs should have been generated
mock_queue.retrieve.assert_called_once()
mock_queue.done.assert_called_once() # item should be marked as complete
mock_executor.assert_called_once() # item should have been processed
assert metrics == {
"bug_count": 1,
"events_processed": 1,
"events_skipped": 0,
"events_failed": 0,
"bugs_failed": 0,
}


@pytest.mark.asyncio
async def test_retry_fail_and_skip(
caplog, mock_queue, mock_executor, queue_item_factory
):
mock_queue.retrieve.return_value = {
1: aiter_sync(
[
queue_item_factory(payload__bug__id=1),
queue_item_factory(payload__bug__id=1),
]
)
}

mock_executor.side_effect = Exception("Throwing an exception")
mock_queue.size.return_value = 3

metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue)
mock_queue.retrieve.assert_called_once()
mock_queue.done.assert_not_called() # no items should have been marked as done
assert caplog.text.count("failed to reprocess event") == 1
assert caplog.text.count("skipping 2 event(s)") == 1
assert caplog.text.count("removing expired event") == 0
mock_executor.assert_called_once() # only one item should have been attempted to be processed
assert metrics == {
"bug_count": 1,
"events_processed": 0,
"events_skipped": 2,
"events_failed": 1,
"bugs_failed": 0,
}


@pytest.mark.asyncio
async def test_retry_remove_expired(
caplog, mock_queue, mock_executor, queue_item_factory
):
mock_queue.retrieve.return_value = {
1: aiter_sync(
[
queue_item_factory(
payload__bug__id=1,
payload__event__time=datetime.now(UTC)
- timedelta(days=int(RETRY_TIMEOUT_DAYS), seconds=1),
),
queue_item_factory(payload__bug__id=1),
]
)
}

metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue)
mock_queue.retrieve.assert_called_once()
assert (
len(mock_queue.done.call_args_list) == 2
), "both items should have been marked as done"
assert caplog.text.count("failed to reprocess event") == 0
assert caplog.text.count("skipping events") == 0
assert caplog.text.count("removing expired event") == 1
mock_executor.assert_called_once() # only one item should have been attempted to be processed
assert metrics == {
"bug_count": 1,
"events_processed": 1,
"events_skipped": 1,
"events_failed": 0,
"bugs_failed": 0,
}


@pytest.mark.asyncio
async def test_retry_bug_failed(caplog, mock_queue, mock_executor, queue_item_factory):
mock_queue.retrieve.return_value = {
1: aiter_sync([queue_item_factory(payload__bug__id=1)]),
2: iter_error(),
}

metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue)
mock_queue.retrieve.assert_called_once()
mock_queue.done.assert_called_once() # one item should have been marked as done
assert caplog.text.count("failed to reprocess event") == 0
assert caplog.text.count("skipping events") == 0
assert caplog.text.count("removing expired event") == 0
assert caplog.text.count("failed to parse events for bug") == 1
mock_executor.assert_called_once() # only one item should have been attempted to be processed
assert metrics == {
"bug_count": 2,
"events_processed": 1,
"events_skipped": 0,
"events_failed": 0,
"bugs_failed": 1,
}