Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,13 @@ Note that this is just the type of exception, and contains no other values. The
assert isinstance(result.errors[0].traceback, str)
```

Note that currently, whilst `.errors` is a list, it will only ever contain a single element.

#### Attempts

The number of times a task has been run is stored as the `.attempts` attribute. This will currently only ever be 0 or 1.
The number of times a task has been run is stored as the `.attempts` attribute. The date of the last attempt is stored as `.last_attempted_at`.

#### Retries

The date of the last attempt is stored as `.last_attempted_at`.
A task result can be retried by calling `.retry()` (or `.aretry`). This adds the task back to the queue, retaining the `id`.

### Backend introspecting

Expand All @@ -231,6 +231,7 @@ Because `django-tasks` enables support for multiple different backends, those ba
- `supports_defer`: Can tasks be enqueued with the `run_after` attribute?
- `supports_async_task`: Can coroutines be enqueued?
- `supports_get_result`: Can results be retrieved after the fact (from **any** thread / process)?
- `supports_retries`: Can results be retried?

```python
from django_tasks import default_task_backend
Expand Down
17 changes: 17 additions & 0 deletions django_tasks/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class BaseTaskBackend(metaclass=ABCMeta):
supports_get_result = False
"""Can results be retrieved after the fact (from **any** thread / process)"""

supports_retries = False
"""Can results be retried"""

def __init__(self, alias: str, params: dict) -> None:
from django_tasks import DEFAULT_QUEUE_NAME

Expand Down Expand Up @@ -129,3 +132,17 @@ def check(self, **kwargs: Any) -> Iterable[messages.CheckMessage]:
"`ENQUEUE_ON_COMMIT` cannot be used when no databases are configured",
hint="Set `ENQUEUE_ON_COMMIT` to False",
)

def retry(self, task_result: TaskResult) -> None:
"""
Retry the task by putting it back into the queue store.
"""
raise NotImplementedError("This backend does not support retries.")

async def aretry(self, task_result: TaskResult) -> None:
"""
Retry the task by putting it back into the queue store.
"""
return await sync_to_async(self.retry, thread_sensitive=True)(
task_result=task_result
)
1 change: 1 addition & 0 deletions django_tasks/backends/database/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DatabaseBackend(BaseTaskBackend):
supports_async_task = True
supports_get_result = True
supports_defer = True
supports_retries = False

def _task_to_db_task(
self,
Expand Down
2 changes: 2 additions & 0 deletions django_tasks/backends/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
class DummyBackend(BaseTaskBackend):
supports_defer = True
supports_async_task = True
supports_retries = False

results: list[TaskResult]

def __init__(self, alias: str, params: dict) -> None:
Expand Down
8 changes: 6 additions & 2 deletions django_tasks/backends/immediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ def _execute_task(self, task_result: TaskResult) -> None:
"""
Execute the task for the given `TaskResult`, mutating it with the outcome
"""
object.__setattr__(task_result, "enqueued_at", timezone.now())
task_enqueued.send(type(self), task_result=task_result)
if task_result.enqueued_at is None:
object.__setattr__(task_result, "enqueued_at", timezone.now())
task_enqueued.send(type(self), task_result=task_result)

task = task_result.task

Expand Down Expand Up @@ -106,3 +107,6 @@ def enqueue(
self._execute_task(task_result)

return task_result

def retry(self, task_result: TaskResult) -> None:
self._execute_task(task_result)
22 changes: 15 additions & 7 deletions django_tasks/backends/rq.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from collections.abc import Iterable
from dataclasses import dataclass
from types import TracebackType
from typing import Any, Optional, TypeVar

Expand All @@ -24,8 +23,8 @@
ResultStatus,
Task,
TaskError,
TaskResult,
)
from django_tasks.task import TaskResult as BaseTaskResult
from django_tasks.utils import get_module_path, get_random_id

T = TypeVar("T")
Expand All @@ -44,11 +43,6 @@
}


@dataclass(frozen=True)
class TaskResult(BaseTaskResult[T]):
pass


class Job(BaseJob):
def perform(self) -> Any:
task_result = self.into_task_result()
Expand Down Expand Up @@ -257,3 +251,17 @@ def check(self, **kwargs: Any) -> Iterable[messages.CheckMessage]:
f"{queue_name!r} is not configured for django-rq",
f"Add {queue_name!r} to RQ_QUEUES",
)

def retry(self, task_result: TaskResult) -> None:
job = self._get_job(task_result.id)

if job is None:
raise ResultDoesNotExist(task_result.id)

if job.retries_left:
queue = django_rq.get_queue(task_result.task.queue_name)

with queue.connection.pipeline() as pipeline:
job.retry(queue=queue, pipeline=pipeline)
else:
job.requeue()
18 changes: 18 additions & 0 deletions django_tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,21 @@ async def arefresh(self) -> None:

for attr in TASK_REFRESH_ATTRS:
object.__setattr__(self, attr, getattr(refreshed_task, attr))

def retry(self) -> None:
"""
Retry the task by putting it back into the queue store.
"""
if self.status != ResultStatus.FAILED:
raise ValueError("Only failed tasks can be retried")

self.task.get_backend().retry(self)

async def aretry(self) -> None:
"""
Retry the task by putting it back into the queue store.
"""
if self.status != ResultStatus.FAILED:
raise ValueError("Only failed tasks can be retried")

await self.task.get_backend().aretry(self)
15 changes: 15 additions & 0 deletions tests/tests/test_dummy_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,21 @@ def test_enqueue_on_commit_with_no_databases(self) -> None:
self.assertEqual(len(errors), 1)
self.assertIn("Set `ENQUEUE_ON_COMMIT` to False", errors[0].hint) # type:ignore[arg-type]

async def test_no_retry(self) -> None:
result = test_tasks.noop_task.enqueue()

with self.assertRaises(ValueError):
result.retry()

with self.assertRaises(ValueError):
await result.aretry()

with self.assertRaises(NotImplementedError):
default_task_backend.retry(result)

with self.assertRaises(NotImplementedError):
await default_task_backend.aretry(result)


class DummyBackendTransactionTestCase(TransactionTestCase):
@override_settings(
Expand Down
17 changes: 17 additions & 0 deletions tests/tests/test_immediate_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,23 @@ def test_check(self) -> None:

self.assertEqual(len(errors), 0, errors)

def test_retry(self) -> None:
with self.assertLogs("django_tasks", level="ERROR"):
result = test_tasks.failing_task_value_error.enqueue()

self.assertEqual(result.status, ResultStatus.FAILED)
self.assertEqual(result.attempts, 1)
self.assertEqual(len(result.errors), 1)
original_enqueued = result.enqueued_at

with self.assertLogs("django_tasks", level="ERROR"):
result.retry()

self.assertEqual(result.status, ResultStatus.FAILED)
self.assertEqual(result.attempts, 2)
self.assertEqual(result.enqueued_at, original_enqueued)
self.assertEqual(len(result.errors), 2)


class ImmediateBackendTransactionTestCase(TransactionTestCase):
@override_settings(
Expand Down
22 changes: 21 additions & 1 deletion tests/tests/test_rq_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_fake_connection(
},
)
@modify_settings(INSTALLED_APPS={"append": ["django_rq"]})
class DatabaseBackendTestCase(TransactionTestCase):
class RQBackendTestCase(TransactionTestCase):
def setUp(self) -> None:
super().setUp()

Expand Down Expand Up @@ -491,3 +491,23 @@ def test_unknown_queue_name(self) -> None:

self.assertEqual(len(errors), 1)
self.assertIn("Add 'queue-2' to RQ_QUEUES", errors[0].hint) # type:ignore[arg-type]

def test_retry(self) -> None:
result = test_tasks.failing_task_value_error.enqueue()

with self.assertLogs("django_tasks", "DEBUG"):
self.run_worker()
result.refresh()

self.assertEqual(result.status, ResultStatus.FAILED)
self.assertEqual(result.attempts, 1)
self.assertEqual(len(result.errors), 1)

result.retry()
with self.assertLogs("django_tasks", "DEBUG"):
self.run_worker()
result.refresh()

self.assertEqual(result.status, ResultStatus.FAILED)
self.assertEqual(result.attempts, 2)
self.assertEqual(len(result.errors), 2)