芹菜与雷迪斯:差分 /异步任务。
#django #celery #redis #tasks

概括

pr't©ambule tl; abiaqian8博士

当我们检测到Web应用程序时,可能会遇到通过Web服务器的超时的任务:大型文件,发送大量电子邮件,用尽机器学习,提供报告, ...

很难由Web应用程序服务器直接制定此类任务的豁免,该服务器具有A timeout 配置,并且必须为(几秒钟)。

我们将看到,grâ¢this thisâDjangoCeleryRedis如何豁免不同的任务,然后它们将不受应用程序服务器的豁免,而是由A Process 一边:A >工人芹菜。

建筑

在这种情况下,使用的不同砖将是:

  • django 用于芹菜模块的Web Applications服务器:它将在 asynchronous 中发送任务以豁免
  • 芹菜工作
  • redis :将被用作message broker。除了是给定和/或NOSQL缓存服务器外,Redis还暗示了Pub/Sub消息的模式:还有生产者/客户(应用程序服务器)发送消息和会读取消息的消息的消费者(芹菜工人):芹菜Django将发送命令免除坦克的命令,其中一名工人将阅读TED施加并执行此操作。任务的启动是 aromic ,也就是说,任务只能由一个 worker
  • 进行

我们可以用以下方式来表示应用程序和过程之间的通信:

Image description

(Surce:https://excalidraw.com/#json=MpX5kp_uYfL17ugUS6a-S,CgyUY0Aqanul0DCksVDzuA

一点代码!

我们将很开心地欺骗一项任务,该任务将等待5秒钟,然后告诉我们它得出,此任务不会施加在应用程序服务器中,而是在 worker celery中,不同:应用程序服务器使手不朽,任务在适当的时间

表示

模块

我们至少需要作为一个模块,对于客户:

  • django&djangostframwork [用于a至¢che的API]
  • 芹菜
  • redis

服务器:

  • 具有启动任务的API的Web应用程序服务器
  • a 工人将收获我们的任务
  • 重新用于改变网络客户与 worker celery
  • 之间的消息

Docker

4张图像启动了4个容器:反向代理主机 - >容器,应用程序服务器 django ,a worker celery 消息经纪 redis

在通过docker-compose up启动这些服务时,我们将拥有

$ docker ps
CONTAINER ID   IMAGE                    COMMAND                  CREATED          STATUS          PORTS                                            NAMES
6caf76fca92e   api-and-worker           "celery -A core.asyn…"   11 minutes ago   Up 11 minutes   8100/tcp                                         celery-worker
9cadb4c717a5   nginx                    "nginx -g 'daemon of…"   11 minutes ago   Up 11 minutes   0.0.0.0:80->80/tcp                               nginx-api-server
b83db3cf05eb   redis:7.0.8              "docker-entrypoint.s…"   11 minutes ago   Up 11 minutes   0.0.0.0:6379->6379/tcp                           redis-broker
654b6d1222de   api-and-worker           "python3 manage.py r…"   11 minutes ago   Up 11 minutes   8100/tcp                                         api-server

使用no-trunc查看可以在docker-conconcose中找到的启动服务的参数:

$ docker ps --no-trunc
CONTAINER ID                                                       IMAGE                    COMMAND                                       CREATED          STATUS          PORTS
                               NAMES
6caf76fca92e5b4e83663019f34f57b5780fd3a9244d07daeb431f5f0e57b255   api-and-worker           "celery -A core.async_tasks worker -l info"   11 minutes ago   Up 11 minutes   8100/tcp
                               celery-worker
9cadb4c717a535d609dbd43ee2bb65667efd1b74f873e6e5b93efd7be365ab3a   nginx                    "nginx -g 'daemon off;'"                      11 minutes ago   Up 11 minutes   0.0.0.0:80->80/tcp                               nginx-api-server
b83db3cf05eb7c9d9aaa4fb3ce120e98f2820f1040434d336ca024e0d1f0a5b6   redis:7.0.8              "docker-entrypoint.sh redis-server"           11 minutes ago   Up 11 minutes   0.0.0.0:6379->6379/tcp                           redis-broker
654b6d1222de36b56f3e3d7bc9d13376ec7e54b527c316bbac7649576a82263f   api-and-worker           "python3 manage.py runserver 0.0.0.0:8110"    11 minutes ago   Up 11 minutes   8100/tcp
                               api-server

代码

在Aura上:

  • 芹菜配置模块:要使用的消息经纪人,芹菜参数(品味要考虑,序列化,超时等)
  • 应用程序常见的“任务”模块, worker 将被带来完成任务,然后称其为“ API”,该API将删除此tâ¢che

脚趾的防御,我们延迟显示输入:

from time import sleep
import logging
logger = logging.getLogger('django')
from core.async_tasks.celery import app

@app.task
def execute_long_task(title: str, delay: int, wordings: list):
    logger.info(f"** On s'endort {delay} secondes avant de lancer la tâche ZZzzzZZzzzzzzzzz **")

    sleep(delay)

    logger.info(f'** On se réveille ! Démarrage de la tâche {title} avec {wordings} **')
    logger.info(f'** Terminée !')

发送和命令的API:

class ExecuteTaskApi(APIView):
    permission_classes = [AllowAny]
    renderer_classes = (JSONRenderer,)

    def get(self, request, *args, **kwargs):
        execute_long_task.delay('Exécute ma longue tâche !', 5,
                                ['Vais attendre', 'le super pouvoir de Celery'])
        return Response({'data': 'je réponds immédiatement sans attendre la fin de la tâche !'})

配置de celery

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")

app = Celery("tasks")
app.config_from_object("core.async_tasks.celery_config")

# par défaut, il recherche les tâches dans le module "tasks.py" https://docs.celeryq.dev/en/stable/reference/celery.html#celery.Celery.autodiscover_tasks
app.autodiscover_tasks(['api'])

浏览器上的http://plateform/api/execute_task/ excation的日志或通过卷发

$ curl http://plateform/api/execute_task/
{"data":"je réponds immédiatement sans attendre la fin de la tâche !"}

应用程序服务器发送订单以豁免任务,并使浏览器从浏览器中删除, worker re-请参阅呼气可爱者,延迟5秒钟以显示不同的异步方面:

api-server        | INFO basehttp "GET /api/execute_task/ HTTP/1.0" 200 73
celery-worker     | [2023-03-12 11:19:00,851: INFO/MainProcess] Received task: api.tasks.execute_long_task[3f1b8339-4192-45da-9607-23246b532af7]
celery-worker     | INFO tasks ** On s'endort 5 secondes avant de lancer la tâche ZZzzzZZzzzzzzzzz **
celery-worker     | [2023-03-12 11:19:00,854: INFO/ForkPoolWorker-8] ** On s'endort 5 secondes avant de lancer la tâche ZZzzzZZzzzzzzzzz **
celery-worker     | INFO tasks ** On se réveille ! Démarrage de la tâche Exécute ma longue tâche ! avec ['Vais attendre', 'le super pouvoir de Celery'] **
celery-worker     | [2023-03-12 11:19:05,860: INFO/ForkPoolWorker-8] ** On se réveille ! Démarrage de la tâche Exécute ma longue tâche ! avec ['Vais attendre', 'le super pouvoir de Celery'] **
celery-worker     | INFO tasks ** Terminée !
celery-worker     | [2023-03-12 11:19:05,861: INFO/ForkPoolWorker-8] ** Terminée !

测试持续了 5秒,在11:19:00至11:19:05之间,对Web服务器和他的 Timout

我们可以说,当我想象可以持续几分钟进行特殊治疗的任务时。

来源

芹菜允许其他参数,例如在EC的情况下恢复任务或确保任务不会被施加2次,... < /p>

还有一个监视工具可以可视化任务状态:Flower

或,有根和编程的任务的可能性,“在crontab”,grô¢this Celery Beat

在github上找到来源https://github.com/zorky/celery-redis-django-tasks