- 6.1
- Версия документации: 3.1
Фреймворк задач Django¶
Для веб-приложения часто требуется нечто большее, чем просто преобразование HTTP-запросов в HTTP-ответы. Для некоторых функций может быть полезно запускать код вне цикла запрос-ответ.
Вот тут-то и приходят на помощь фоновые задачи.
Фоновые задачи могут разгрузить работу и выполнить ее вне цикла запрос-ответ, чтобы выполнить ее в другом месте, возможно, позже. Это обеспечивает быстроту запросов, уменьшает задержку и улучшает взаимодействие с пользователем. Например, пользователю не нужно ждать отправки электронного письма до завершения загрузки страницы.
Фреймворк Tasks Django позволяет легко определять и ставить в очередь такую работу. Он не предоставляет рабочий механизм для запуска задач. Фактическое выполнение должно обрабатываться инфраструктурой вне Django, например отдельным процессом или службой. Учитывая это, необходимо оценить и настроить серверную часть задачи, способную выполнять задачи в этой службе.
Основы фоновой задачи¶
Когда работу необходимо выполнить в фоновом режиме, Django создает «Задачу», которая сохраняется в хранилище очередей. Эта Задача содержит все метаданные, необходимые для ее выполнения, а также уникальный идентификатор, позволяющий Django получить результат позже.
Работник будет искать в хранилище очередей новые задачи для запуска. Когда добавляется новая задача, работник запрашивает задачу, выполняет ее и сохраняет статус и результат обратно в хранилище очередей. Эти исполнители работают вне жизненного цикла запроса-ответа.
Настройка серверной части задачи¶
Серверная часть задач определяет, как и где задачи сохраняются для выполнения и как они выполняются. Различные серверные части задач имеют разные характеристики и параметры конфигурации, которые могут повлиять на производительность и надежность вашего приложения. В состав Django входят встроенные бэкэнды, но они предназначены только для разработки и тестирования.
Django занимается определением задач, их проверкой, постановкой в очередь и обработкой результатов, а не их выполнением, поэтому для производственных установок необходим серверный или рабочий процесс, который фактически выполняет работу в очереди. Соответствующие параметры перечислены на странице «Экосистема сообщества <https://www.djangoproject.com/community/ecosystem/>`__».
Серверные функции задач настраиваются с помощью параметра TASKS в вашем файле настроек. Хотя большинству приложений потребуется только один бэкэнд, поддерживаются несколько.
Немедленное исполнение¶
Это серверная часть по умолчанию, если в вашем файле настроек не указан другой. ImmediateBackend запускает поставленные в очередь задачи немедленно, а не в фоновом режиме. Это позволяет постепенно добавлять в приложение функциональные возможности фоновых задач до того, как будет доступна необходимая инфраструктура.
Чтобы использовать его, установите для BACKEND значение "django.tasks.backends.immediate.ImmediateBackend":
TASKS = {"default": {"BACKEND": "django.tasks.backends.immediate.ImmediateBackend"}}
ImmediateBackend также может быть полезен в тестах, чтобы обойти необходимость запуска реального фонового работника в ваших тестах.
Dummy бэкенд¶
DummyBackend вообще не выполняет задачи, поставленные в очередь, вместо этого сохраняет результаты для последующего использования. Результаты задачи навсегда останутся в состоянии READY.
Этот бэкэнд не предназначен для использования в производстве — он предоставляется для удобства, которое можно использовать во время разработки и тестирования.
Чтобы использовать его, установите для BACKEND значение "django.tasks.backends.dummy.DummyBackend":
TASKS = {"default": {"BACKEND": "django.tasks.backends.dummy.DummyBackend"}}
Результаты задач, поставленных в очередь, можно получить из атрибута results бэкэнда:
>>> from django.tasks import default_task_backend
>>> my_task.enqueue()
>>> len(default_task_backend.results)
1
Сохраненные результаты можно очистить с помощью метода clear():
>>> default_task_backend.clear()
>>> len(default_task_backend.results)
0
Сторонние бэкэнды¶
Как упоминалось в начале этого раздела, Django включает в себя бэкэнды, подходящие только для разработки и тестирования. Производственные системы должны полагаться на серверные части, которые обеспечивают рабочий процесс и реализацию устойчивой очереди. Чтобы использовать внешний бэкэнд Task с Django, используйте путь импорта Python в качестве BACKEND параметра TASKS, например:
TASKS = {
"default": {
"BACKEND": "path.to.backend",
}
}
Серверная часть Task — это класс, который наследует BaseTaskBackend. Как минимум, он должен реализовать BaseTaskBackend.enqueue(). Если вы создаете собственную серверную часть, вы можете использовать встроенные серверные части задач в качестве эталонных реализаций. Вы найдете код в каталоге django/tasks/backends/ исходного кода Django.
Асинхронная поддержка¶
В Django разрабатывается поддержка асинхронных бэкэндов задач.
BaseTaskBackend имеет асинхронные варианты всех базовых методов. По соглашению асинхронные версии всех методов имеют префикс a. Аргументы в пользу обоих вариантов одни и те же.
Получение бэкэндов¶
Бэкэнды можно получить с помощью обработчика соединения Task_backends:
from django.tasks import task_backends
task_backends["default"] # The default backend
task_backends["reserve"] # Another backend
Бэкэнд «по умолчанию» доступен как default_task_backend:
from django.tasks import default_task_backend
Определение задач¶
Задачи определяются с помощью декоратора django.tasks.task() в функции уровня модуля:
from django.core.mail import send_mail
from django.tasks import task
@task
def email_users(emails, subject, message):
return send_mail(
subject=subject, message=message, from_email=None, recipient_list=emails
)
Возвращаемое значение декоратора — это экземпляр Task.
Атрибуты Task можно настроить с помощью аргументов декоратора @task:
from django.core.mail import send_mail
from django.tasks import task
@task(priority=2, queue_name="emails")
def email_users(emails, subject, message):
return send_mail(
subject=subject, message=message, from_email=None, recipient_list=emails
)
По соглашению задачи определяются в файле Tasks.py, однако это не является обязательным.
Контекст задачи¶
Иногда выполняемой задаче может потребоваться информация о том, как она была поставлена в очередь и как она выполняется. Доступ к нему можно получить, приняв аргумент context, который является экземпляром TaskContext.
Чтобы получить контекст задачи в качестве аргумента функции Task, передайте takes_context при ее определении:
import logging
from django.core.mail import send_mail
from django.tasks import task
logger = logging.getLogger(__name__)
@task(takes_context=True)
def email_users(context, emails, subject, message):
logger.debug(
f"Attempt {context.attempt} to send user email. Task result id: {context.task_result.id}."
)
return send_mail(
subject=subject, message=message, from_email=None, recipient_list=emails
)
Изменение задач¶
Перед постановкой Задачи в очередь может потребоваться изменить определенные параметры Задачи. Например, чтобы придать ему более высокий приоритет, чем обычно.
Экземпляр Task нельзя изменить напрямую. Вместо этого можно создать модифицированный экземпляр с помощью метода using(), оставив исходный вариант как есть. Например:
>>> email_users.priority
0
>>> email_users.using(priority=10).priority
10
Постановка задач в очередь¶
Чтобы добавить задачу в хранилище очереди и она будет выполнена, вызовите для нее метод enqueue(). Если Задача принимает аргументы, их можно передать как есть. Например:
result = email_users.enqueue(
emails=["user@example.com"],
subject="You have a message",
message="Hello there!",
)
Это возвращает TaskResult, который можно использовать для получения результата задачи после ее завершения.
Чтобы поставить задачи в асинхронный контекст, aenqueue() доступен как асинхронный вариант enqueue().
Поскольку и аргументы задачи, и возвращаемые значения сериализуются в JSON, они должны быть сериализуемы в формате JSON:
>>> process_data.enqueue(datetime.now())
Traceback (most recent call last):
...
TypeError: Object of type datetime is not JSON serializable
Аргументы также должны иметь возможность проходить цикл json.dumps()/ json.loads() без изменения типа. Например, рассмотрим эту задачу:
@task()
def double_dictionary(key):
return {key: key * 2}
Если ImmediateBackend настроен как серверная часть по умолчанию:
>>> result = double_dictionary.enqueue((1, 2, 3))
>>> result.status
FAILED
>>> result.errors[0].traceback
Traceback (most recent call last):
...
TypeError: unhashable type: 'list'
Задача double_dictionary завершается неудачно, поскольку после обратного прохождения JSON кортеж (1, 2, 3) становится списком [1, 2, 3], который нельзя использовать в качестве ключа словаря.
Как правило, сложные объекты, такие как экземпляры модели или встроенные типы, такие как datetime и кортеж, нельзя использовать в Задачах без дополнительного преобразования.
Транзакции¶
Для большинства серверных частей задачи выполняются в отдельном процессе с использованием другого подключения к базе данных. При использовании транзакции, не дожидаясь ее фиксации, работники могут начать обработку задачи, которая использует объекты, к которым у них пока нет доступа.
Например, рассмотрим этот упрощенный пример:
@task
def my_task(thing_num):
Thing.objects.get(num=thing_num)
with transaction.atomic():
Thing.objects.create(num=1)
my_task.enqueue(thing_num=1)
Чтобы предотвратить сценарий, в котором my_task запускается до того, как Thing будет зафиксирован в базе данных, используйте transaction.on_commit(), привязывая все аргументы к enqueue() через functools.partial():
from functools import partial
from django.db import transaction
with transaction.atomic():
Thing.objects.create(num=1)
transaction.on_commit(partial(my_task.enqueue, thing_num=1))
Результаты задачи¶
При постановке в очередь Task вы получаете TaskResult, однако, вероятно, будет полезно получить результат откуда-то еще (например, из другого запроса или другой задачи).
Каждый TaskResult имеет уникальный id, который можно использовать для идентификации и получения результата после завершения кода, поставившего задачу в очередь.
Метод get_result() может получить результат на основе его id:
# Later, somewhere else...
result = email_users.get_result(result_id)
Чтобы получить TaskResult, независимо от того, из какого типа Task оно было, используйте метод get_result() на бэкэнде:
from django.tasks import default_task_backend
result = default_task_backend.get_result(result_id)
Для получения результатов в асинхронном контексте доступен aget_result() как асинхронный вариант get_result() как на серверной части, так и на Task.
Некоторые бэкэнды, такие как встроенный ImmediateBackend, не поддерживают get_result(). Вызов get_result() на этих бэкэндах вызовет NotImplementedError.
Обновление результатов¶
TaskResult содержит статус выполнения задачи на момент ее получения. Если Задача завершается после вызова get_result(), она не будет обновляться.
Чтобы обновить значения, вызовите метод django.tasks.TaskResult.refresh():
>>> result.status
RUNNING
>>> result.refresh() # or await result.arefresh()
>>> result.status
SUCCESSFUL
Возвращаемые значения¶
Если ваша функция Task что-то возвращает, это можно получить из атрибута django.tasks.TaskResult.return_value:
>>> result.status
SUCCESSFUL
>>> result.return_value
42
Если Задача не завершила выполнение или произошла ошибка, возникает ValueError.
>>> result.status
RUNNING
>>> result.return_value
Traceback (most recent call last):
...
ValueError: Task has not finished yet
Ошибки¶
Если Задача не завершается успешно и вместо этого вызывает исключение либо как часть Задачи, либо как часть ее запуска, исключение и обратная трассировка сохраняются в списке django.tasks.TaskResult.errors.
Каждая запись в errors представляет собой TaskError, содержащий информацию об ошибке, возникшей во время выполнения:
>>> result.errors[0].exception_class
<class 'ValueError'>
Обратите внимание, что это всего лишь тип исключения и не содержит других значений. Информация трассировки сводится к строке, которую можно использовать для отладки:
>>> result.errors[0].traceback
Traceback (most recent call last):
...
TypeError: Object of type datetime is not JSON serializable