diff --git a/README.md b/README.md index 3c56245e..66548bdd 100644 --- a/README.md +++ b/README.md @@ -240,6 +240,7 @@ Whilst signals are available, they may not be the most maintainable approach. - `django_tasks.signals.task_enqueued`: Called when a task is enqueued. The sender is the backend class. Also called with the enqueued `task_result`. - `django_tasks.signals.task_finished`: Called when a task finishes (`SUCCEEDED` or `FAILED`). The sender is the backend class. Also called with the finished `task_result`. +- `django_tasks.signals.task_started`: Called immediately before a task starts executing. The sender is the backend class. Also called with the started `task_result`. ## RQ diff --git a/django_tasks/backends/database/management/commands/db_worker.py b/django_tasks/backends/database/management/commands/db_worker.py index 64386f1a..2f6ae792 100644 --- a/django_tasks/backends/database/management/commands/db_worker.py +++ b/django_tasks/backends/database/management/commands/db_worker.py @@ -21,7 +21,7 @@ from django_tasks.backends.database.models import DBTaskResult from django_tasks.backends.database.utils import exclusive_transaction from django_tasks.exceptions import InvalidTaskBackendError -from django_tasks.signals import task_finished +from django_tasks.signals import task_finished, task_started from django_tasks.task import DEFAULT_QUEUE_NAME package_logger = logging.getLogger("django_tasks") @@ -131,19 +131,17 @@ def run_task(self, db_task_result: DBTaskResult) -> None: task = db_task_result.task task_result = db_task_result.task_result - logger.info( - "Task id=%s path=%s state=%s", - db_task_result.id, - db_task_result.task_path, - task_result.status, - ) + backend_type = task.get_backend() + + task_started.send(sender=backend_type, task_result=task_result) + return_value = task.call(*task_result.args, **task_result.kwargs) # Setting the return and success value inside the error handling, # So errors setting it (eg JSON encode) can still be recorded db_task_result.set_succeeded(return_value) task_finished.send( - sender=type(task.get_backend()), task_result=db_task_result.task_result + sender=backend_type, task_result=db_task_result.task_result ) except BaseException as e: db_task_result.set_failed(e) diff --git a/django_tasks/backends/immediate.py b/django_tasks/backends/immediate.py index 070cc505..cfae387c 100644 --- a/django_tasks/backends/immediate.py +++ b/django_tasks/backends/immediate.py @@ -8,7 +8,7 @@ from django.utils import timezone from typing_extensions import ParamSpec -from django_tasks.signals import task_enqueued, task_finished +from django_tasks.signals import task_enqueued, task_finished, task_started from django_tasks.task import ResultStatus, Task, TaskResult from django_tasks.utils import get_exception_traceback, get_random_id, json_normalize @@ -37,7 +37,10 @@ def _execute_task(self, task_result: TaskResult) -> None: async_to_sync(task.func) if iscoroutinefunction(task.func) else task.func ) + object.__setattr__(task_result, "status", ResultStatus.RUNNING) object.__setattr__(task_result, "started_at", timezone.now()) + task_started.send(sender=type(self), task_result=task_result) + try: object.__setattr__( task_result, diff --git a/django_tasks/backends/rq.py b/django_tasks/backends/rq.py index 8c26d50b..4ecba0c6 100644 --- a/django_tasks/backends/rq.py +++ b/django_tasks/backends/rq.py @@ -17,7 +17,7 @@ from django_tasks.backends.base import BaseTaskBackend from django_tasks.exceptions import ResultDoesNotExist -from django_tasks.signals import task_enqueued, task_finished +from django_tasks.signals import task_enqueued, task_finished, task_started from django_tasks.task import DEFAULT_PRIORITY, MAX_PRIORITY, ResultStatus, Task from django_tasks.task import TaskResult as BaseTaskResult from django_tasks.utils import get_module_path, get_random_id @@ -44,6 +44,13 @@ class TaskResult(BaseTaskResult[T]): class Job(BaseJob): + def perform(self) -> Any: + task_result = self.into_task_result() + + task_started.send(type(task_result.task.get_backend()), task_result=task_result) + + return super().perform() + def _execute(self) -> Any: """ Shim RQ's `Job` to call the underlying `Task` function. @@ -112,18 +119,22 @@ def failed_callback( exception_value: Exception, traceback: TracebackType, ) -> None: - task_result = job.into_task_result() - # Smuggle the exception class through meta job.meta["exception_class"] = get_module_path(exception_class) job.save_meta() # type: ignore[no-untyped-call] + task_result = job.into_task_result() + + object.__setattr__(task_result, "status", ResultStatus.FAILED) + task_finished.send(type(task_result.task.get_backend()), task_result=task_result) def success_callback(job: Job, connection: Optional[Redis], result: Any) -> None: task_result = job.into_task_result() + object.__setattr__(task_result, "status", ResultStatus.SUCCEEDED) + task_finished.send(type(task_result.task.get_backend()), task_result=task_result) diff --git a/django_tasks/signal_handlers.py b/django_tasks/signal_handlers.py index eb4aa8a8..d455beb5 100644 --- a/django_tasks/signal_handlers.py +++ b/django_tasks/signal_handlers.py @@ -6,7 +6,7 @@ from django_tasks import BaseTaskBackend, ResultStatus, TaskResult -from .signals import task_enqueued, task_finished +from .signals import task_enqueued, task_finished, task_started logger = logging.getLogger("django_tasks") @@ -35,6 +35,18 @@ def log_task_enqueued( ) +@receiver(task_started) +def log_task_started( + sender: type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict +) -> None: + logger.info( + "Task id=%s path=%s state=%s", + task_result.id, + task_result.task.module_path, + task_result.status, + ) + + @receiver(task_finished) def log_task_finished( sender: type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict diff --git a/django_tasks/signals.py b/django_tasks/signals.py index be1cf30b..0369ec10 100644 --- a/django_tasks/signals.py +++ b/django_tasks/signals.py @@ -2,3 +2,4 @@ task_enqueued = Signal() task_finished = Signal() +task_started = Signal() diff --git a/tests/tests/test_immediate_backend.py b/tests/tests/test_immediate_backend.py index 5afcaea5..1ee60328 100644 --- a/tests/tests/test_immediate_backend.py +++ b/tests/tests/test_immediate_backend.py @@ -220,9 +220,28 @@ def test_enqueue_logs(self) -> None: with self.assertLogs("django_tasks", level="DEBUG") as captured_logs: result = test_tasks.noop_task.enqueue() + self.assertEqual(len(captured_logs.output), 3) + self.assertIn("enqueued", captured_logs.output[0]) self.assertIn(result.id, captured_logs.output[0]) + self.assertIn("state=RUNNING", captured_logs.output[1]) + self.assertIn(result.id, captured_logs.output[1]) + + self.assertIn("state=SUCCEEDED", captured_logs.output[2]) + self.assertIn(result.id, captured_logs.output[2]) + + def test_failed_logs(self) -> None: + with self.assertLogs("django_tasks", level="DEBUG") as captured_logs: + result = test_tasks.failing_task_value_error.enqueue() + + self.assertEqual(len(captured_logs.output), 3) + self.assertIn("state=RUNNING", captured_logs.output[1]) + self.assertIn(result.id, captured_logs.output[1]) + + self.assertIn("state=FAILED", captured_logs.output[2]) + self.assertIn(result.id, captured_logs.output[2]) + def test_check(self) -> None: errors = list(default_task_backend.check()) diff --git a/tests/tests/test_rq_backend.py b/tests/tests/test_rq_backend.py index dc81a055..8a11898b 100644 --- a/tests/tests/test_rq_backend.py +++ b/tests/tests/test_rq_backend.py @@ -142,7 +142,9 @@ def test_catches_exception(self) -> None: ): result = task.enqueue() - self.run_worker() + with self.assertLogs("django_tasks", "DEBUG"): + self.run_worker() + result.refresh() # assert result @@ -166,7 +168,8 @@ def test_catches_exception(self) -> None: def test_complex_exception(self) -> None: result = test_tasks.complex_exception.enqueue() - self.run_worker() + with self.assertLogs("django_tasks", "DEBUG"): + self.run_worker() result.refresh() @@ -399,6 +402,32 @@ def test_enqueue_logs(self) -> None: self.assertIn("enqueued", captured_logs.output[0]) self.assertIn(result.id, captured_logs.output[0]) + def test_started_finished_logs(self) -> None: + result = test_tasks.noop_task.enqueue() + + with self.assertLogs("django_tasks", level="DEBUG") as captured_logs: + self.run_worker() + + self.assertEqual(len(captured_logs.output), 2) + self.assertIn("state=RUNNING", captured_logs.output[0]) + self.assertIn(result.id, captured_logs.output[0]) + + self.assertIn("state=SUCCEEDED", captured_logs.output[1]) + self.assertIn(result.id, captured_logs.output[1]) + + def test_failed_logs(self) -> None: + result = test_tasks.failing_task_value_error.enqueue() + + with self.assertLogs("django_tasks", level="DEBUG") as captured_logs: + self.run_worker() + + self.assertEqual(len(captured_logs.output), 2) + self.assertIn("state=RUNNING", captured_logs.output[0]) + self.assertIn(result.id, captured_logs.output[0]) + + self.assertIn("state=FAILED", captured_logs.output[1]) + self.assertIn(result.id, captured_logs.output[1]) + def test_enqueue_priority(self) -> None: task_1 = test_tasks.noop_task.enqueue() task_2 = test_tasks.noop_task.using(priority=100).enqueue()