Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ Whilst signals are available, they may not be the most maintainable approach.

- `django_tasks.signals.task_enqueued`: Called when a task is enqueued. The sender is the backend class. Also called with the enqueued `task_result`.
- `django_tasks.signals.task_finished`: Called when a task finishes (`SUCCEEDED` or `FAILED`). The sender is the backend class. Also called with the finished `task_result`.
- `django_tasks.signals.task_started`: Called immediately before a task starts executing. The sender is the backend class. Also called with the started `task_result`.

## RQ

Expand Down
14 changes: 6 additions & 8 deletions django_tasks/backends/database/management/commands/db_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from django_tasks.backends.database.models import DBTaskResult
from django_tasks.backends.database.utils import exclusive_transaction
from django_tasks.exceptions import InvalidTaskBackendError
from django_tasks.signals import task_finished
from django_tasks.signals import task_finished, task_started
from django_tasks.task import DEFAULT_QUEUE_NAME

package_logger = logging.getLogger("django_tasks")
Expand Down Expand Up @@ -131,19 +131,17 @@ def run_task(self, db_task_result: DBTaskResult) -> None:
task = db_task_result.task
task_result = db_task_result.task_result

logger.info(
"Task id=%s path=%s state=%s",
db_task_result.id,
db_task_result.task_path,
task_result.status,
)
backend_type = task.get_backend()

task_started.send(sender=backend_type, task_result=task_result)

return_value = task.call(*task_result.args, **task_result.kwargs)

# Setting the return and success value inside the error handling,
# So errors setting it (eg JSON encode) can still be recorded
db_task_result.set_succeeded(return_value)
task_finished.send(
sender=type(task.get_backend()), task_result=db_task_result.task_result
sender=backend_type, task_result=db_task_result.task_result
)
except BaseException as e:
db_task_result.set_failed(e)
Expand Down
5 changes: 4 additions & 1 deletion django_tasks/backends/immediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.utils import timezone
from typing_extensions import ParamSpec

from django_tasks.signals import task_enqueued, task_finished
from django_tasks.signals import task_enqueued, task_finished, task_started
from django_tasks.task import ResultStatus, Task, TaskResult
from django_tasks.utils import get_exception_traceback, get_random_id, json_normalize

Expand Down Expand Up @@ -37,7 +37,10 @@ def _execute_task(self, task_result: TaskResult) -> None:
async_to_sync(task.func) if iscoroutinefunction(task.func) else task.func
)

object.__setattr__(task_result, "status", ResultStatus.RUNNING)
object.__setattr__(task_result, "started_at", timezone.now())
task_started.send(sender=type(self), task_result=task_result)

try:
object.__setattr__(
task_result,
Expand Down
17 changes: 14 additions & 3 deletions django_tasks/backends/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from django_tasks.backends.base import BaseTaskBackend
from django_tasks.exceptions import ResultDoesNotExist
from django_tasks.signals import task_enqueued, task_finished
from django_tasks.signals import task_enqueued, task_finished, task_started
from django_tasks.task import DEFAULT_PRIORITY, MAX_PRIORITY, ResultStatus, Task
from django_tasks.task import TaskResult as BaseTaskResult
from django_tasks.utils import get_module_path, get_random_id
Expand All @@ -44,6 +44,13 @@ class TaskResult(BaseTaskResult[T]):


class Job(BaseJob):
def perform(self) -> Any:
task_result = self.into_task_result()

task_started.send(type(task_result.task.get_backend()), task_result=task_result)

return super().perform()

def _execute(self) -> Any:
"""
Shim RQ's `Job` to call the underlying `Task` function.
Expand Down Expand Up @@ -112,18 +119,22 @@ def failed_callback(
exception_value: Exception,
traceback: TracebackType,
) -> None:
task_result = job.into_task_result()

# Smuggle the exception class through meta
job.meta["exception_class"] = get_module_path(exception_class)
job.save_meta() # type: ignore[no-untyped-call]

task_result = job.into_task_result()

object.__setattr__(task_result, "status", ResultStatus.FAILED)

task_finished.send(type(task_result.task.get_backend()), task_result=task_result)


def success_callback(job: Job, connection: Optional[Redis], result: Any) -> None:
task_result = job.into_task_result()

object.__setattr__(task_result, "status", ResultStatus.SUCCEEDED)

task_finished.send(type(task_result.task.get_backend()), task_result=task_result)


Expand Down
14 changes: 13 additions & 1 deletion django_tasks/signal_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from django_tasks import BaseTaskBackend, ResultStatus, TaskResult

from .signals import task_enqueued, task_finished
from .signals import task_enqueued, task_finished, task_started

logger = logging.getLogger("django_tasks")

Expand Down Expand Up @@ -35,6 +35,18 @@ def log_task_enqueued(
)


@receiver(task_started)
def log_task_started(
sender: type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict
) -> None:
logger.info(
"Task id=%s path=%s state=%s",
task_result.id,
task_result.task.module_path,
task_result.status,
)


@receiver(task_finished)
def log_task_finished(
sender: type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict
Expand Down
1 change: 1 addition & 0 deletions django_tasks/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

task_enqueued = Signal()
task_finished = Signal()
task_started = Signal()
19 changes: 19 additions & 0 deletions tests/tests/test_immediate_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,28 @@ def test_enqueue_logs(self) -> None:
with self.assertLogs("django_tasks", level="DEBUG") as captured_logs:
result = test_tasks.noop_task.enqueue()

self.assertEqual(len(captured_logs.output), 3)

self.assertIn("enqueued", captured_logs.output[0])
self.assertIn(result.id, captured_logs.output[0])

self.assertIn("state=RUNNING", captured_logs.output[1])
self.assertIn(result.id, captured_logs.output[1])

self.assertIn("state=SUCCEEDED", captured_logs.output[2])
self.assertIn(result.id, captured_logs.output[2])

def test_failed_logs(self) -> None:
with self.assertLogs("django_tasks", level="DEBUG") as captured_logs:
result = test_tasks.failing_task_value_error.enqueue()

self.assertEqual(len(captured_logs.output), 3)
self.assertIn("state=RUNNING", captured_logs.output[1])
self.assertIn(result.id, captured_logs.output[1])

self.assertIn("state=FAILED", captured_logs.output[2])
self.assertIn(result.id, captured_logs.output[2])

def test_check(self) -> None:
errors = list(default_task_backend.check())

Expand Down
33 changes: 31 additions & 2 deletions tests/tests/test_rq_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ def test_catches_exception(self) -> None:
):
result = task.enqueue()

self.run_worker()
with self.assertLogs("django_tasks", "DEBUG"):
self.run_worker()

result.refresh()

# assert result
Expand All @@ -166,7 +168,8 @@ def test_catches_exception(self) -> None:
def test_complex_exception(self) -> None:
result = test_tasks.complex_exception.enqueue()

self.run_worker()
with self.assertLogs("django_tasks", "DEBUG"):
self.run_worker()

result.refresh()

Expand Down Expand Up @@ -399,6 +402,32 @@ def test_enqueue_logs(self) -> None:
self.assertIn("enqueued", captured_logs.output[0])
self.assertIn(result.id, captured_logs.output[0])

def test_started_finished_logs(self) -> None:
result = test_tasks.noop_task.enqueue()

with self.assertLogs("django_tasks", level="DEBUG") as captured_logs:
self.run_worker()

self.assertEqual(len(captured_logs.output), 2)
self.assertIn("state=RUNNING", captured_logs.output[0])
self.assertIn(result.id, captured_logs.output[0])

self.assertIn("state=SUCCEEDED", captured_logs.output[1])
self.assertIn(result.id, captured_logs.output[1])

def test_failed_logs(self) -> None:
result = test_tasks.failing_task_value_error.enqueue()

with self.assertLogs("django_tasks", level="DEBUG") as captured_logs:
self.run_worker()

self.assertEqual(len(captured_logs.output), 2)
self.assertIn("state=RUNNING", captured_logs.output[0])
self.assertIn(result.id, captured_logs.output[0])

self.assertIn("state=FAILED", captured_logs.output[1])
self.assertIn(result.id, captured_logs.output[1])

def test_enqueue_priority(self) -> None:
task_1 = test_tasks.noop_task.enqueue()
task_2 = test_tasks.noop_task.using(priority=100).enqueue()
Expand Down