diff --git a/README.md b/README.md index 617bd9a..03b1f38 100644 --- a/README.md +++ b/README.md @@ -216,13 +216,13 @@ Note that this is just the type of exception, and contains no other values. The assert isinstance(result.errors[0].traceback, str) ``` -Note that currently, whilst `.errors` is a list, it will only ever contain a single element. - #### Attempts -The number of times a task has been run is stored as the `.attempts` attribute. This will currently only ever be 0 or 1. +The number of times a task has been run is stored as the `.attempts` attribute. The date of the last attempt is stored as `.last_attempted_at`. + +#### Retries -The date of the last attempt is stored as `.last_attempted_at`. +A task result can be retried by calling `.retry()` (or `.aretry`). This adds the task back to the queue, retaining the `id`. ### Backend introspecting @@ -231,6 +231,7 @@ Because `django-tasks` enables support for multiple different backends, those ba - `supports_defer`: Can tasks be enqueued with the `run_after` attribute? - `supports_async_task`: Can coroutines be enqueued? - `supports_get_result`: Can results be retrieved after the fact (from **any** thread / process)? +- `supports_retries`: Can results be retried? ```python from django_tasks import default_task_backend diff --git a/django_tasks/backends/base.py b/django_tasks/backends/base.py index 3297006..89743a3 100644 --- a/django_tasks/backends/base.py +++ b/django_tasks/backends/base.py @@ -32,6 +32,9 @@ class BaseTaskBackend(metaclass=ABCMeta): supports_get_result = False """Can results be retrieved after the fact (from **any** thread / process)""" + supports_retries = False + """Can results be retried""" + def __init__(self, alias: str, params: dict) -> None: from django_tasks import DEFAULT_QUEUE_NAME @@ -129,3 +132,17 @@ def check(self, **kwargs: Any) -> Iterable[messages.CheckMessage]: "`ENQUEUE_ON_COMMIT` cannot be used when no databases are configured", hint="Set `ENQUEUE_ON_COMMIT` to False", ) + + def retry(self, task_result: TaskResult) -> None: + """ + Retry the task by putting it back into the queue store. + """ + raise NotImplementedError("This backend does not support retries.") + + async def aretry(self, task_result: TaskResult) -> None: + """ + Retry the task by putting it back into the queue store. + """ + return await sync_to_async(self.retry, thread_sensitive=True)( + task_result=task_result + ) diff --git a/django_tasks/backends/database/backend.py b/django_tasks/backends/database/backend.py index 3ae3064..3534339 100644 --- a/django_tasks/backends/database/backend.py +++ b/django_tasks/backends/database/backend.py @@ -30,6 +30,7 @@ class DatabaseBackend(BaseTaskBackend): supports_async_task = True supports_get_result = True supports_defer = True + supports_retries = False def _task_to_db_task( self, diff --git a/django_tasks/backends/dummy.py b/django_tasks/backends/dummy.py index 9ae2e28..dc58d08 100644 --- a/django_tasks/backends/dummy.py +++ b/django_tasks/backends/dummy.py @@ -20,6 +20,8 @@ class DummyBackend(BaseTaskBackend): supports_defer = True supports_async_task = True + supports_retries = False + results: list[TaskResult] def __init__(self, alias: str, params: dict) -> None: diff --git a/django_tasks/backends/immediate.py b/django_tasks/backends/immediate.py index 7cdc9a4..537b616 100644 --- a/django_tasks/backends/immediate.py +++ b/django_tasks/backends/immediate.py @@ -33,8 +33,9 @@ def _execute_task(self, task_result: TaskResult) -> None: """ Execute the task for the given `TaskResult`, mutating it with the outcome """ - object.__setattr__(task_result, "enqueued_at", timezone.now()) - task_enqueued.send(type(self), task_result=task_result) + if task_result.enqueued_at is None: + object.__setattr__(task_result, "enqueued_at", timezone.now()) + task_enqueued.send(type(self), task_result=task_result) task = task_result.task @@ -106,3 +107,6 @@ def enqueue( self._execute_task(task_result) return task_result + + def retry(self, task_result: TaskResult) -> None: + self._execute_task(task_result) diff --git a/django_tasks/backends/rq.py b/django_tasks/backends/rq.py index a520c10..9e0e199 100644 --- a/django_tasks/backends/rq.py +++ b/django_tasks/backends/rq.py @@ -1,5 +1,4 @@ from collections.abc import Iterable -from dataclasses import dataclass from types import TracebackType from typing import Any, Optional, TypeVar @@ -24,8 +23,8 @@ ResultStatus, Task, TaskError, + TaskResult, ) -from django_tasks.task import TaskResult as BaseTaskResult from django_tasks.utils import get_module_path, get_random_id T = TypeVar("T") @@ -44,11 +43,6 @@ } -@dataclass(frozen=True) -class TaskResult(BaseTaskResult[T]): - pass - - class Job(BaseJob): def perform(self) -> Any: task_result = self.into_task_result() @@ -257,3 +251,17 @@ def check(self, **kwargs: Any) -> Iterable[messages.CheckMessage]: f"{queue_name!r} is not configured for django-rq", f"Add {queue_name!r} to RQ_QUEUES", ) + + def retry(self, task_result: TaskResult) -> None: + job = self._get_job(task_result.id) + + if job is None: + raise ResultDoesNotExist(task_result.id) + + if job.retries_left: + queue = django_rq.get_queue(task_result.task.queue_name) + + with queue.connection.pipeline() as pipeline: + job.retry(queue=queue, pipeline=pipeline) + else: + job.requeue() diff --git a/django_tasks/task.py b/django_tasks/task.py index 0cc1826..4e70d41 100644 --- a/django_tasks/task.py +++ b/django_tasks/task.py @@ -331,3 +331,21 @@ async def arefresh(self) -> None: for attr in TASK_REFRESH_ATTRS: object.__setattr__(self, attr, getattr(refreshed_task, attr)) + + def retry(self) -> None: + """ + Retry the task by putting it back into the queue store. + """ + if self.status != ResultStatus.FAILED: + raise ValueError("Only failed tasks can be retried") + + self.task.get_backend().retry(self) + + async def aretry(self) -> None: + """ + Retry the task by putting it back into the queue store. + """ + if self.status != ResultStatus.FAILED: + raise ValueError("Only failed tasks can be retried") + + await self.task.get_backend().aretry(self) diff --git a/tests/tests/test_dummy_backend.py b/tests/tests/test_dummy_backend.py index cb63567..997110b 100644 --- a/tests/tests/test_dummy_backend.py +++ b/tests/tests/test_dummy_backend.py @@ -195,6 +195,21 @@ def test_enqueue_on_commit_with_no_databases(self) -> None: self.assertEqual(len(errors), 1) self.assertIn("Set `ENQUEUE_ON_COMMIT` to False", errors[0].hint) # type:ignore[arg-type] + async def test_no_retry(self) -> None: + result = test_tasks.noop_task.enqueue() + + with self.assertRaises(ValueError): + result.retry() + + with self.assertRaises(ValueError): + await result.aretry() + + with self.assertRaises(NotImplementedError): + default_task_backend.retry(result) + + with self.assertRaises(NotImplementedError): + await default_task_backend.aretry(result) + class DummyBackendTransactionTestCase(TransactionTestCase): @override_settings( diff --git a/tests/tests/test_immediate_backend.py b/tests/tests/test_immediate_backend.py index 528269a..0d2d82a 100644 --- a/tests/tests/test_immediate_backend.py +++ b/tests/tests/test_immediate_backend.py @@ -257,6 +257,23 @@ def test_check(self) -> None: self.assertEqual(len(errors), 0, errors) + def test_retry(self) -> None: + with self.assertLogs("django_tasks", level="ERROR"): + result = test_tasks.failing_task_value_error.enqueue() + + self.assertEqual(result.status, ResultStatus.FAILED) + self.assertEqual(result.attempts, 1) + self.assertEqual(len(result.errors), 1) + original_enqueued = result.enqueued_at + + with self.assertLogs("django_tasks", level="ERROR"): + result.retry() + + self.assertEqual(result.status, ResultStatus.FAILED) + self.assertEqual(result.attempts, 2) + self.assertEqual(result.enqueued_at, original_enqueued) + self.assertEqual(len(result.errors), 2) + class ImmediateBackendTransactionTestCase(TransactionTestCase): @override_settings( diff --git a/tests/tests/test_rq_backend.py b/tests/tests/test_rq_backend.py index 27298c1..4c7b84b 100644 --- a/tests/tests/test_rq_backend.py +++ b/tests/tests/test_rq_backend.py @@ -61,7 +61,7 @@ def get_fake_connection( }, ) @modify_settings(INSTALLED_APPS={"append": ["django_rq"]}) -class DatabaseBackendTestCase(TransactionTestCase): +class RQBackendTestCase(TransactionTestCase): def setUp(self) -> None: super().setUp() @@ -491,3 +491,23 @@ def test_unknown_queue_name(self) -> None: self.assertEqual(len(errors), 1) self.assertIn("Add 'queue-2' to RQ_QUEUES", errors[0].hint) # type:ignore[arg-type] + + def test_retry(self) -> None: + result = test_tasks.failing_task_value_error.enqueue() + + with self.assertLogs("django_tasks", "DEBUG"): + self.run_worker() + result.refresh() + + self.assertEqual(result.status, ResultStatus.FAILED) + self.assertEqual(result.attempts, 1) + self.assertEqual(len(result.errors), 1) + + result.retry() + with self.assertLogs("django_tasks", "DEBUG"): + self.run_worker() + result.refresh() + + self.assertEqual(result.status, ResultStatus.FAILED) + self.assertEqual(result.attempts, 2) + self.assertEqual(len(result.errors), 2)