与Django的多个芹菜队列一起工作
#python #django #celery

芹菜是基于分布式消息传递的异步任务队列系统。它允许开发人员同时执行背景作业,与主要应用程序流分开,从而确保诸如发送电子邮件,生成报告或处理数据之类的任务不会阻止或延迟用户的请求。

通过将这些重量级任务卸载到芹菜中,诸如Django之类的Web框架可以快速响应用户并允许该任务在后台运行,从而确保用户不在等待。

使用单个队列可能足以满足较小的应用程序,随着项目的增长和任务的变化,您可能会发现需要优先考虑某些任务或在专业工人上运行它们。在本文中,我们将研究如何在本地开发和部署到生产时与Django一起使用多个队列。

集成芹菜与django

我们首先需要设置我们的Django项目以与芹菜合作,然后再添加多个队列。

将芹菜添加到Django的第一步是确保安装了必要的依赖项。我们可以在命令行上安装它们,但是我们将使用 sumplions.txt 文件使过程可重复。

Django==4.1.7
celery==5.2.7
redis==4.5.5
gunicorn==20.1.0

现在,在包含您 settings.py 文件的同一目录中创建一个名为 celery.py 的文件。该文件将充当每个芹菜工人的入口点。配置设置在此处加载,并告知工人在哪里找到任务定义。 celery.py 文件也可以选择包含计划的任务配置。

import os
from datetime import timedelta

from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

app = Celery("myapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

app.conf.beat_schedule = {
    "run-every-10-seconds": {
        "task": "myapp.tasks.foo",
        "schedule": timedelta(seconds=10),
    },
    "run-every-1800-seconds": {
        "task": "myapp.tasks.bar",
        "schedule": timedelta(seconds=1800),
    },
}

最后,我们需要对设置进行少量更新。代理URL将包含一个指向REDIS实例的连接字符串。此REDIS实例将负责管理包含待处理任务数据的队列。

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL")

使用环境变量将允许本地和生产部署无需更改代码。

随着最后的调整,我们的Django项目几乎应该准备与芹菜一起工作。接下来,我们将查看如何容忍此设置并使用Docker在本地运行。

local开发与Django和芹菜

我们可以使用Docker并共同组成,以使Django和芹菜更加容易地使本地开发。此方法还使与其他开发人员在项目上进行协作变得更加容易。

第一步是将DJANGO和芹菜工作过程集合化。为此,我们将编写一个 dockerfile ,其中包含具有必要依赖性图像的说明。

FROM python:3.9-slim-buster

ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1

RUN apt-get update && apt-get install -y \
    libpq-dev \
    && apt-get -y clean

WORKDIR /usr/src/app

COPY ./requirements.txt /usr/src/app

RUN pip install --upgrade pip && pip install -r requirements.txt

COPY . /usr/src/app/

如果您熟悉 dockerfile 格式,则可能会注意到最后没有 cmd 参数。这是有意的,因为我们将使用 docker-compose 稍后指定命令。这将允许一个 dockerfile 为django和芹菜容器都起作用。

使用Docker组成以管理容器

这是一个简单的 docker-compose.yml 文件,它将配置Django Dev Server,Celery Worker和Redis实例。我们在这里使用开发服务器,因为我们想利用热重加载功能。生产部署将使用枪支为应用程序服务。

version: '3.8'

services:
  web:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: django_dev
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ${PWD}:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    ports:
      - "8000:8000"
    depends_on:
      - redis

  celery:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: celery_worker
    command: celery -A myapp worker --loglevel=info
    volumes:
      - ${PWD}:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      - redis

  redis:
    image: redis:latest
    container_name: redis
    ports:
      - "6379:6379"

启动和运行此容器集合只需要一个命令,该命令还将构建图像并下载任何远程图像,例如Redis容器。

docker-compose up -d

要完成一个简单的芹菜任务

现在我们已经运行所有容器,我们应该花一点时间来实现概念验证任务。

我们将向 protem_queues_app/目录添加一个名为 tasks.py 的新文件,并实现一个简单的异步任务。该任务将计算fibonacci编号,尽管以非常效率的递归方式。

from celery import shared_task

@shared_task
def fibonacci(n):
    if n <= 1:
        return n
    else:
        return fibonacci(n - 1) + fibonacci(n - 2)

现在,我们需要一种观点,可以调用以调用此任务并在后台进行处理。在 proture_queues_app/views.py 文件中添加以下代码。

from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from .tasks import fibonacci

@csrf_exempt
def calculate_fibonacci(request):
    if request.method == "POST":
        try:
            N = int(request.POST.get("N"))
            result = fibonacci.delay(N)
            return JsonResponse(
                {"task_id": result.task_id, "status": "Task submitted successfully!"},
                status=200,
            )
        except ValueError:
            return JsonResponse({"error": "Invalid N value provided."}, status=400)
    else:
        return JsonResponse({"error": "Only POST method is allowed."}, status=405)

当然,我们几乎在那里,但是我们需要在应用程序中添加 urls.py ,并更新主要项目 urls.py 此观点将作出回应。

from django.urls import path
from . import views

urlpatterns = [
    path("fibonacci/", views.calculate_fibonacci, name="calculate_fibonacci"),
]
from django.contrib import admin
from django.urls import include, path

urlpatterns = [
    path("admin/", admin.site.urls),
    path("", include("multiple_queues_app.urls")),
]

有了视图和任务,我们现在可以尝试所有这些。第一个问题a docker-compose重新启动,以便更新芹菜工人代码。现在,我们可以使用我们选择的任何工具来测试视图,但是我将使用卷曲来提出请求。

curl -X POST -d "N=25" http://localhost:8000/fibonacci/

观看芹菜工人的日志,我们应该看到该任务被重新加入,并需要3秒钟才能执行。

使用芹菜工人热装的看门狗

您可能已经注意到,我们必须在尝试新任务之前手动重新启动芹菜工人。手动重新启动工人将很快变得笨拙。理想情况下,我们可以具有类似于Django Dev服务器中的热重载的功能。

幸运的是,看门狗实用程序提供了一种简单的方法,可以在目录更改时重新启动过程。让我们修改 docker-compose.yml 文件以使用 watchMedo watchdog 实用程序的脚本,在芹菜的命令部分中。

  celery:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: celery_worker
    command: watchmedo auto-restart --directory=multiple_queues_app --pattern=*.py --recursive -- celery -A myapp worker --loglevel=info
    volumes:
      - ${PWD}:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      - redis

但是,在更新容器之前,我们首先需要添加一个 insuert-dev.txt 文件并修改 dockerfile 以从中安装。

FROM python:3.9-slim-buster

ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1

RUN apt-get update && apt-get install -y \
    libpq-dev \
    && apt-get -y clean

WORKDIR /usr/src/app

COPY ./requirements.txt /usr/src/app
COPY ./requirements-dev.txt /usr/src/app

RUN pip install --upgrade pip && pip install -r requirements.txt && pip install -r requirements-dev.txt

COPY . /usr/src/app/

通过重建和重新启动容器,您现在应该能够修改Python文件,并自动查看芹菜工人。

在本地开发中添加多个队列

如何更新此本地开发设置以包括多个芹菜队列?

该过程就像在 docker-compose.yml 文件中添加另一个服务一样简单。但是,这里的问题在于,您的撰写文件可能会开始长时间重复。我们可以使用一些模板魔术来保持干燥。

,我们可以不断地复制整个芹菜服务块。

以下是更新的 docker-compose.yml ,添加了一个新的芹菜工人。

version: '3.8'

x-worker-opts: &worker-opts
  build:
    context: .
    dockerfile: Dockerfile
  volumes:
    - ${PWD}:/usr/src/app
  environment:
    - CELERY_BROKER_URL=redis://redis:6379/0
  depends_on:
    - redis

services:
  web:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: django_dev
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ${PWD}:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    ports:
      - "8000:8000"
    depends_on:
      - redis

  fibonacci-worker:
    command: tools/start_celery.sh -Q fibonacci --concurrency=1
    <<: *worker-opts

  prime-worker:
    command: tools/start_celery.sh -Q prime --concurrency=1
    <<: *worker-opts

  redis:
    image: redis:latest
    container_name: redis
    ports:
      - "6379:6379"

我们还需要调整Python任务代码。以前,所有任务都被发送给处理所有工作的单个工人。现在,我们将工作分为单独的队列,我们​​需要指定哪些队列应处理任务。

from celery import shared_task

@shared_task(queue="fibonacci")
def fibonacci(n):
    if n <= 1:
        return n
    else:
        return fibonacci(n - 1) + fibonacci(n - 2)

@shared_task(queue="prime")
def nth_prime(n):
    count = 0
    num = 1

    while count < n:
        num += 1
        if num < 2:
            continue
        if any(num % i == 0 for i in range(2, int(num**0.5) + 1)):
            continue
        count += 1

    return num

与Django和芹菜一起部署

有很多方法可以将芹菜工人部署到生产环境中,但是在本文中,我将重点介绍一种相对简单的技术。在本文中,我们将通过GitHub动作和Ansible组合进行部署的过程。

Ansible是一个可靠且经过时间测试的框架,用于将配置应用于SSH的远程计算机。用户定义 playbooks 在远程计算机上配置应用程序和服务。 Ansible采用这些剧本,并检查目标机与所需的配置匹配,安装和更新使机器最新的必要内容。

您可能会问为什么不使用Docker并为生产服务器组成?这可能是一个合法的选择,但是像Ansible这样的工具使您可以通过多台计算机部署应用程序。使用Docker做同样的事情很可能意味着使用Kubernetes或Nomad等工具,这大大提高了复杂性水平。

由于Ansible通过SSH起作用,我们首先需要授予对目标机器的访问。

在真实的生产方案中,您可能需要使用具有已配置的特定可抗用户的虚拟机映像。出于测试目的,我只是使用根用户开始了数字海洋液滴。我们将在本地生成一个新的SSH密钥对,然后将公共密钥复制到目标计算机。私钥将在github上作为秘密。

ssh-keygen -f ansible-key
ssh-copy-id -i ansible-key.pub root@24.199.126.163

您当然应该将IP地址更改为您控制的机器的地址。 ssh-copy-id 程序假设您的本地用户可以使用有效的SSH键或密码登录。这里的目的是设置一个新生成的密钥对,只有Ansible才能使用。

现在,公共密钥已上传到目标机器,我们将在github上存储私钥,以便Ansible在操作管道内运行时可以登录。从上面的命令中,我们将密钥对命名为 ansible-key ansible-key.pub - 我们需要上传私人(即non .pub)键到github。

使用GitHub操作工作流程以触发Ansible

在潜入Ansible本身之前,我们需要一些东西来触发和执行剧本。 GitHub动作是为此目的的便捷工具。对于这个简单的教程,我们将在每次推动下运行Ansible Playbook to Main 分支。更健壮的设置可以使用分支名称来确定是部署到分期环境还是生产。

目前,这是一个简单的 decloy.yml ,可以启动Ansible Playbook。它还演示了如何将github的秘密传递到Ansible。

name: Deploy

on:
  push:
    branches:
      - main

jobs:
  update_celery:
    runs-on: ubuntu-latest
    environment: production
    container: willhallonline/ansible:alpine
    steps:
      - name: Checkout repository
        uses: actions/checkout@v2
        with:
          fetch-depth: 0

      - name: Set up SSH key
        working-directory: ansible
        env:
          ANSIBLE_SSH_PRIVATE_KEY: ${{ secrets.ANSIBLE_SSH_PRIVATE_KEY }}
        run: |
          echo "$ANSIBLE_SSH_PRIVATE_KEY" > id_rsa
          chmod 600 id_rsa

      - name: Run Ansible playbook
        working-directory: ansible
        run: ansible-playbook -i inventory.ini playbook.yml --tags "app,redis,celery" --extra-vars "redis_password=${{ secrets.REDIS_PASSWORD }}"

使用Systemd管理芹菜工人

在我们的Ansible Playbook中,我们将有一些任务负责设置芹菜工人并保持代码更新。我发现创建一个 Ansible 目录以存储所有这些配置很有用。

这是目录结构的概述,以了解与我们项目中的Ansible有关的内容。

ansible
├── ansible.cfg
├── inventory.ini
├── playbook.yml
└── roles
    ├── app
    │   ├── tasks
    │   │   └── main.yml
    │   └── templates
    │       ├── gunicorn.conf.py.j2
    │       ├── gunicorn-run-dir.service.j2
    │       └── gunicorn.service.j2
    ├── celery
    │   ├── tasks
    │   │   └── main.yml
    │   └── templates
    │       ├── celery-beat.service.j2
    │       └── celery.service.j2
    └── redis
        └── tasks
            └── main.yml

有两个主要任务,其中一项是进行了初始设置和Redis的安装,另一个是为了配置芹菜工人。由于这种方法仅将Docker用于本地开发,因此我们将使用 Systemd 服务来编排芹菜。如果您从未使用过 SystemD ,请放心,它不会太复杂。使用 SystemD 是确保在VM重新启动或工人死亡的情况下恢复芹菜的一种简单方法。

我们将为每个工人提供一个单独的 SystemD 服务,用于 celery-beat 实例,用于计划任务。我不会显示两个配置文件,因为它们非常相似,但是下面是芹菜工作人员的 Systemd 单元文件。

[Unit]
Description=Celery Worker {{ item.name }} for Multiple Queues App
After=network.target

[Service]
Environment="CELERY_BROKER_URL=redis://:{{ redis_password }}@localhost:6379/0"
Environment="PYTHONUNBUFFERED=1"
Type=simple
User=root
Group=root
WorkingDirectory={{ app_path }}
ExecStart={{ app_path }}/.venv/bin/celery -A multiple_queues_app worker -Q {{ item.name }} --loglevel=info
Restart=always
RestartSec=10
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=celery-{{ item.name }}

[Install]
WantedBy=multi-user.target

您会注意到该模板包含Jinja标签。这将使我们能够为每个单独的芹菜队列编写一个单元文件,在我们扩大队列数量时将凌乱的重复保持在最低限度。

在剧本级别上定义一个新的芹菜队列。

- hosts: production
  become: yes 
  vars:
    app_repo: git@github.com:zchtodd/multiple_queues_app.git
    app_path: /opt/multiple_queues_app
    venv_path: /opt/multiple_queues_app/.venv
    gunicorn_config_path: /etc/systemd/system/gunicorn.service
    run_dir_maker_config_path: /etc/systemd/system/gunicorn-run-dir.service
    celery_workers:
      - name: fibonacci

要创建一种新类型的队列,您可以将名称添加到 celery_workers list和Ansible将通过模板系统来处理配置文件。

脱离芹菜与Ansible

下一步是使用Ansible来创建芹菜工人服务。这个过程非常简单,只是涉及写出我们之前看到的 SystemD 配置文件。单独的Ansible角色处理克隆应用程序代码并启动Web服务器进程。您可以在project GitHub repository中找到所有细节。

- name: Create Celery logs directories
  ansible.builtin.file:
    path: "/var/log/celery/{{ item.name }}"
    state: directory
  loop: "{{ celery_workers }}"

- name: Create Celery Beat Systemd service file
  ansible.builtin.template:
    src: "{{ playbook_dir }}/roles/celery/templates/celery-beat.service.j2"
    dest: "/etc/systemd/system/celery-beat.service"
    mode: "0644"

- name: Create Celery Systemd service files
  ansible.builtin.template:
    src: "{{ playbook_dir }}/roles/celery/templates/celery.service.j2"
    dest: "/etc/systemd/system/celery-{{ item.name }}.service"
    mode: "0644"
  loop: "{{ celery_workers }}"

- name: Reload Systemd configuration
  ansible.builtin.systemd:
    daemon_reload: yes

- name: Enable and start Celery services
  ansible.builtin.systemd:
    name: "celery-{{ item.name }}"
    state: started
    enabled: yes
  loop: "{{ celery_workers }}"

- name: Enable and start Celery beat
  ansible.builtin.systemd:
    name: "celery-beat"
    state: started
    enabled: yes

- name: Restart Celery service
  ansible.builtin.systemd:
    name: "celery-{{ item.name }}"
    state: restarted
    daemon_reload: yes
  loop: "{{ celery_workers }}"

- name: Restart Celery beat
  ansible.builtin.systemd:
    name: "celery-beat"
    state: restarted
    daemon_reload: yes

一旦建立了芹菜工人,您就可以使用 Journal Systemctl 来监视和管理流程。

例如,要拖延斐波那契工人的日志,您将执行以下命令。

journalctl -u celery-fibonacci.service -f

Multiple芹菜队列示例应用程序

本教程并未显示所有代码,因此,如果您想查看完整的实现,example app is hosted on GitHub.

完整的应用程序应该是一个可以扩展的完整模板。

快乐黑客!