• 6.1
  • Версия документации: 3.1

Фреймворк задач Django

New in Django 6.0.

Для веб-приложения часто требуется нечто большее, чем просто преобразование HTTP-запросов в HTTP-ответы. Для некоторых функций может быть полезно запускать код вне цикла запрос-ответ.

Вот тут-то и приходят на помощь фоновые задачи.

Фоновые задачи могут разгрузить работу и выполнить ее вне цикла запрос-ответ, чтобы выполнить ее в другом месте, возможно, позже. Это обеспечивает быстроту запросов, уменьшает задержку и улучшает взаимодействие с пользователем. Например, пользователю не нужно ждать отправки электронного письма до завершения загрузки страницы.

Фреймворк Tasks Django позволяет легко определять и ставить в очередь такую ​​работу. Он не предоставляет рабочий механизм для запуска задач. Фактическое выполнение должно обрабатываться инфраструктурой вне Django, например отдельным процессом или службой. Учитывая это, необходимо оценить и настроить серверную часть задачи, способную выполнять задачи в этой службе.

Основы фоновой задачи

Когда работу необходимо выполнить в фоновом режиме, Django создает «Задачу», которая сохраняется в хранилище очередей. Эта Задача содержит все метаданные, необходимые для ее выполнения, а также уникальный идентификатор, позволяющий Django получить результат позже.

Работник будет искать в хранилище очередей новые задачи для запуска. Когда добавляется новая задача, работник запрашивает задачу, выполняет ее и сохраняет статус и результат обратно в хранилище очередей. Эти исполнители работают вне жизненного цикла запроса-ответа.

Настройка серверной части задачи

Серверная часть задач определяет, как и где задачи сохраняются для выполнения и как они выполняются. Различные серверные части задач имеют разные характеристики и параметры конфигурации, которые могут повлиять на производительность и надежность вашего приложения. В состав Django входят встроенные бэкэнды, но они предназначены только для разработки и тестирования.

Django занимается определением задач, их проверкой, постановкой в ​​очередь и обработкой результатов, а не их выполнением, поэтому для производственных установок необходим серверный или рабочий процесс, который фактически выполняет работу в очереди. Соответствующие параметры перечислены на странице «Экосистема сообщества <https://www.djangoproject.com/community/ecosystem/>`__».

Серверные функции задач настраиваются с помощью параметра TASKS в вашем файле настроек. Хотя большинству приложений потребуется только один бэкэнд, поддерживаются несколько.

Немедленное исполнение

Это серверная часть по умолчанию, если в вашем файле настроек не указан другой. ImmediateBackend запускает поставленные в очередь задачи немедленно, а не в фоновом режиме. Это позволяет постепенно добавлять в приложение функциональные возможности фоновых задач до того, как будет доступна необходимая инфраструктура.

Чтобы использовать его, установите для BACKEND значение "django.tasks.backends.immediate.ImmediateBackend":

TASKS = {"default": {"BACKEND": "django.tasks.backends.immediate.ImmediateBackend"}}

ImmediateBackend также может быть полезен в тестах, чтобы обойти необходимость запуска реального фонового работника в ваших тестах.

Dummy бэкенд

DummyBackend вообще не выполняет задачи, поставленные в очередь, вместо этого сохраняет результаты для последующего использования. Результаты задачи навсегда останутся в состоянии READY.

Этот бэкэнд не предназначен для использования в производстве — он предоставляется для удобства, которое можно использовать во время разработки и тестирования.

Чтобы использовать его, установите для BACKEND значение "django.tasks.backends.dummy.DummyBackend":

TASKS = {"default": {"BACKEND": "django.tasks.backends.dummy.DummyBackend"}}

Результаты задач, поставленных в очередь, можно получить из атрибута results бэкэнда:

>>> from django.tasks import default_task_backend
>>> my_task.enqueue()
>>> len(default_task_backend.results)
1

Сохраненные результаты можно очистить с помощью метода clear():

>>> default_task_backend.clear()
>>> len(default_task_backend.results)
0

Сторонние бэкэнды

Как упоминалось в начале этого раздела, Django включает в себя бэкэнды, подходящие только для разработки и тестирования. Производственные системы должны полагаться на серверные части, которые обеспечивают рабочий процесс и реализацию устойчивой очереди. Чтобы использовать внешний бэкэнд Task с Django, используйте путь импорта Python в качестве BACKEND параметра TASKS, например:

TASKS = {
    "default": {
        "BACKEND": "path.to.backend",
    }
}

Серверная часть Task — это класс, который наследует BaseTaskBackend. Как минимум, он должен реализовать BaseTaskBackend.enqueue(). Если вы создаете собственную серверную часть, вы можете использовать встроенные серверные части задач в качестве эталонных реализаций. Вы найдете код в каталоге django/tasks/backends/ исходного кода Django.

Асинхронная поддержка

В Django разрабатывается поддержка асинхронных бэкэндов задач.

BaseTaskBackend имеет асинхронные варианты всех базовых методов. По соглашению асинхронные версии всех методов имеют префикс a. Аргументы в пользу обоих вариантов одни и те же.

Получение бэкэндов

Бэкэнды можно получить с помощью обработчика соединения Task_backends:

from django.tasks import task_backends

task_backends["default"]  # The default backend
task_backends["reserve"]  # Another backend

Бэкэнд «по умолчанию» доступен как default_task_backend:

from django.tasks import default_task_backend

Определение задач

Задачи определяются с помощью декоратора django.tasks.task() в функции уровня модуля:

from django.core.mail import send_mail
from django.tasks import task


@task
def email_users(emails, subject, message):
    return send_mail(
        subject=subject, message=message, from_email=None, recipient_list=emails
    )

Возвращаемое значение декоратора — это экземпляр Task.

Атрибуты Task можно настроить с помощью аргументов декоратора @task:

from django.core.mail import send_mail
from django.tasks import task


@task(priority=2, queue_name="emails")
def email_users(emails, subject, message):
    return send_mail(
        subject=subject, message=message, from_email=None, recipient_list=emails
    )

По соглашению задачи определяются в файле Tasks.py, однако это не является обязательным.

Контекст задачи

Иногда выполняемой задаче может потребоваться информация о том, как она была поставлена ​​в очередь и как она выполняется. Доступ к нему можно получить, приняв аргумент context, который является экземпляром TaskContext.

Чтобы получить контекст задачи в качестве аргумента функции Task, передайте takes_context при ее определении:

import logging
from django.core.mail import send_mail
from django.tasks import task


logger = logging.getLogger(__name__)


@task(takes_context=True)
def email_users(context, emails, subject, message):
    logger.debug(
        f"Attempt {context.attempt} to send user email. Task result id: {context.task_result.id}."
    )
    return send_mail(
        subject=subject, message=message, from_email=None, recipient_list=emails
    )

Изменение задач

Перед постановкой Задачи в очередь может потребоваться изменить определенные параметры Задачи. Например, чтобы придать ему более высокий приоритет, чем обычно.

Экземпляр Task нельзя изменить напрямую. Вместо этого можно создать модифицированный экземпляр с помощью метода using(), оставив исходный вариант как есть. Например:

>>> email_users.priority
0
>>> email_users.using(priority=10).priority
10

Постановка задач в очередь

Чтобы добавить задачу в хранилище очереди и она будет выполнена, вызовите для нее метод enqueue(). Если Задача принимает аргументы, их можно передать как есть. Например:

result = email_users.enqueue(
    emails=["user@example.com"],
    subject="You have a message",
    message="Hello there!",
)

Это возвращает TaskResult, который можно использовать для получения результата задачи после ее завершения.

Чтобы поставить задачи в асинхронный контекст, aenqueue() доступен как асинхронный вариант enqueue().

Поскольку и аргументы задачи, и возвращаемые значения сериализуются в JSON, они должны быть сериализуемы в формате JSON:

>>> process_data.enqueue(datetime.now())
Traceback (most recent call last):
...
TypeError: Object of type datetime is not JSON serializable

Аргументы также должны иметь возможность проходить цикл json.dumps()/ json.loads() без изменения типа. Например, рассмотрим эту задачу:

@task()
def double_dictionary(key):
    return {key: key * 2}

Если ImmediateBackend настроен как серверная часть по умолчанию:

>>> result = double_dictionary.enqueue((1, 2, 3))
>>> result.status
FAILED
>>> result.errors[0].traceback
Traceback (most recent call last):
...
TypeError: unhashable type: 'list'

Задача double_dictionary завершается неудачно, поскольку после обратного прохождения JSON кортеж (1, 2, 3) становится списком [1, 2, 3], который нельзя использовать в качестве ключа словаря.

Как правило, сложные объекты, такие как экземпляры модели или встроенные типы, такие как datetime и кортеж, нельзя использовать в Задачах без дополнительного преобразования.

Транзакции

Для большинства серверных частей задачи выполняются в отдельном процессе с использованием другого подключения к базе данных. При использовании транзакции, не дожидаясь ее фиксации, работники могут начать обработку задачи, которая использует объекты, к которым у них пока нет доступа.

Например, рассмотрим этот упрощенный пример:

@task
def my_task(thing_num):
    Thing.objects.get(num=thing_num)


with transaction.atomic():
    Thing.objects.create(num=1)
    my_task.enqueue(thing_num=1)

Чтобы предотвратить сценарий, в котором my_task запускается до того, как Thing будет зафиксирован в базе данных, используйте transaction.on_commit(), привязывая все аргументы к enqueue() через functools.partial():

from functools import partial

from django.db import transaction


with transaction.atomic():
    Thing.objects.create(num=1)
    transaction.on_commit(partial(my_task.enqueue, thing_num=1))

Результаты задачи

При постановке в очередь Task вы получаете TaskResult, однако, вероятно, будет полезно получить результат откуда-то еще (например, из другого запроса или другой задачи).

Каждый TaskResult имеет уникальный id, который можно использовать для идентификации и получения результата после завершения кода, поставившего задачу в очередь.

Метод get_result() может получить результат на основе его id:

# Later, somewhere else...
result = email_users.get_result(result_id)

Чтобы получить TaskResult, независимо от того, из какого типа Task оно было, используйте метод get_result() на бэкэнде:

from django.tasks import default_task_backend

result = default_task_backend.get_result(result_id)

Для получения результатов в асинхронном контексте доступен aget_result() как асинхронный вариант get_result() как на серверной части, так и на Task.

Некоторые бэкэнды, такие как встроенный ImmediateBackend, не поддерживают get_result(). Вызов get_result() на этих бэкэндах вызовет NotImplementedError.

Обновление результатов

TaskResult содержит статус выполнения задачи на момент ее получения. Если Задача завершается после вызова get_result(), она не будет обновляться.

Чтобы обновить значения, вызовите метод django.tasks.TaskResult.refresh():

>>> result.status
RUNNING
>>> result.refresh()  # or await result.arefresh()
>>> result.status
SUCCESSFUL

Возвращаемые значения

Если ваша функция Task что-то возвращает, это можно получить из атрибута django.tasks.TaskResult.return_value:

>>> result.status
SUCCESSFUL
>>> result.return_value
42

Если Задача не завершила выполнение или произошла ошибка, возникает ValueError.

>>> result.status
RUNNING
>>> result.return_value
Traceback (most recent call last):
...
ValueError: Task has not finished yet

Ошибки

Если Задача не завершается успешно и вместо этого вызывает исключение либо как часть Задачи, либо как часть ее запуска, исключение и обратная трассировка сохраняются в списке django.tasks.TaskResult.errors.

Каждая запись в errors представляет собой TaskError, содержащий информацию об ошибке, возникшей во время выполнения:

>>> result.errors[0].exception_class
<class 'ValueError'>

Обратите внимание, что это всего лишь тип исключения и не содержит других значений. Информация трассировки сводится к строке, которую можно использовать для отладки:

>>> result.errors[0].traceback
Traceback (most recent call last):
...
TypeError: Object of type datetime is not JSON serializable
Back to Top