Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('pyproject.toml') }}
key: ${{ runner.os }}-${{ matrix.python-version }}-pip-${{ hashFiles('pyproject.toml') }}
- uses: taiki-e/install-action@just
- name: Install dependencies
run: |
Expand All @@ -43,11 +43,85 @@ jobs:
- name: Run tests
run: just test

test-postgres:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
strategy:
fail-fast: false
matrix:
django-version: ["4.2", "5.0"]
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.11
uses: actions/setup-python@v4
with:
python-version: 3.11
- uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('pyproject.toml') }}
- uses: taiki-e/install-action@just
- name: Install dependencies
run: |
pip install --upgrade pip
pip install -e '.[dev]'
pip install Django~=${{ matrix.django-version }}
- name: Run tests
run: just test
env:
DATABASE_URL: postgres://postgres:postgres@localhost/postgres

test-mysql:
runs-on: ubuntu-latest
services:
mysql:
image: mysql:8.4
env:
MYSQL_ROOT_PASSWORD: django
MYSQL_DATABASE: django
ports:
- 3306:3306
strategy:
fail-fast: false
matrix:
django-version: ["4.2", "5.0"]
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.11
uses: actions/setup-python@v4
with:
python-version: 3.11
- uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('pyproject.toml') }}
- uses: taiki-e/install-action@just
- name: Install dependencies
run: |
pip install --upgrade pip
pip install -e '.[dev]'
pip install Django~=${{ matrix.django-version }}
- name: Run tests
run: just test
env:
DATABASE_URL: mysql://root:[email protected]/django

build:
permissions:
id-token: write # IMPORTANT: this permission is mandatory for trusted publishing
runs-on: ubuntu-latest
needs: test
needs:
- test
- test-postgres
- test-mysql
steps:
- uses: actions/checkout@v3
with:
Expand Down
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ A few backends are included by default:

- `DummyBackend`: Don't execute the tasks, just store them. This is especially useful for testing.
- `ImmediateBackend`: Execute the task immediately in the current thread
- `DatabaseBackend`: Store tasks in the database (via Django's ORM), and retrieve and execute them using the `db_worker` management command

### Defining tasks

Expand Down Expand Up @@ -65,7 +66,7 @@ modified_task = calculate_meaning_of_life.using(priority=10)

In addition to the above attributes, `run_after` can be passed to specify a specific time the task should run. Both a timezone-aware `datetime` or `timedelta` may be passed.

### Executing tasks
### Enqueueing tasks

To execute a task, call the `enqueue` method on it:

Expand All @@ -77,6 +78,22 @@ The returned `TaskResult` can be interrogated to query the current state of the

If the task takes arguments, these can be passed as-is to `enqueue`.

### Executing tasks with the database backend

First, you'll need to add `django_tasks.backends.database` to `INSTALLED_APPS`, and run `manage.py migrate`.

Next, configure the database backend:

```python
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.database.DatabaseBackend"
}
}
```

Finally, you can run `manage.py db_worker` to run tasks as they're created. Check the `--help` for more options.

### Sending emails

To make sending emails simpler, a backend is provided to automatically create tasks for sending emails via SMTP:
Expand Down
4 changes: 2 additions & 2 deletions django_tasks/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def validate_task(self, task: Task) -> None:
if not self.supports_async_task and iscoroutinefunction(task.func):
raise InvalidTaskError("Backend does not support async tasks")

if task.priority is not None and task.priority < 1:
raise InvalidTaskError("priority must be positive")
if task.priority < 0:
raise InvalidTaskError("priority must be zero or greater")

if not self.supports_defer and task.run_after is not None:
raise InvalidTaskError("Backend does not support run_after")
Expand Down
3 changes: 3 additions & 0 deletions django_tasks/backends/database/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .backend import DatabaseBackend

__all__ = ["DatabaseBackend"]
6 changes: 6 additions & 0 deletions django_tasks/backends/database/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class TasksAppConfig(AppConfig):
name = "django_tasks.backends.database"
label = "django_tasks_database"
90 changes: 90 additions & 0 deletions django_tasks/backends/database/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from dataclasses import asdict, dataclass
from typing import TYPE_CHECKING, TypeVar

from django.core.exceptions import ValidationError
from typing_extensions import ParamSpec

from django_tasks.backends.base import BaseTaskBackend
from django_tasks.exceptions import ResultDoesNotExist
from django_tasks.task import Task
from django_tasks.task import TaskResult as BaseTaskResult
from django_tasks.utils import json_normalize

if TYPE_CHECKING:
from .models import DBTaskResult

T = TypeVar("T")
P = ParamSpec("P")


@dataclass
class TaskResult(BaseTaskResult[T]):
db_result: "DBTaskResult"

def refresh(self) -> None:
self.db_result.refresh_from_db()
for attr, value in asdict(self.db_result.task_result).items():
setattr(self, attr, value)

async def arefresh(self) -> None:
await self.db_result.arefresh_from_db()
for attr, value in asdict(self.db_result.task_result).items():
setattr(self, attr, value)


class DatabaseBackend(BaseTaskBackend):
supports_async_task = True
supports_get_result = True
supports_defer = True

def _task_to_db_task(
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
) -> "DBTaskResult":
from .models import DBTaskResult

return DBTaskResult(
args_kwargs=json_normalize({"args": args, "kwargs": kwargs}),
priority=task.priority,
task_path=task.module_path,
queue_name=task.queue_name,
run_after=task.run_after,
backend_name=self.alias,
)

def enqueue(
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
) -> TaskResult[T]:
self.validate_task(task)

db_result = self._task_to_db_task(task, args, kwargs)

db_result.save()

return db_result.task_result

async def aenqueue(
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
) -> TaskResult[T]:
self.validate_task(task)

db_result = self._task_to_db_task(task, args, kwargs)

await db_result.asave()

return db_result.task_result

def get_result(self, result_id: str) -> TaskResult:
from .models import DBTaskResult

try:
return DBTaskResult.objects.get(id=result_id).task_result
except (DBTaskResult.DoesNotExist, ValidationError) as e:
raise ResultDoesNotExist(result_id) from e

async def aget_result(self, result_id: str) -> TaskResult:
from .models import DBTaskResult

try:
return (await DBTaskResult.objects.aget(id=result_id)).task_result
except (DBTaskResult.DoesNotExist, ValidationError) as e:
raise ResultDoesNotExist(result_id) from e
Loading