芹菜是基于分布式消息传递的异步任务队列系统。它允许开发人员同时执行背景作业,与主要应用程序流分开,从而确保诸如发送电子邮件,生成报告或处理数据之类的任务不会阻止或延迟用户的请求。
通过将这些重量级任务卸载到芹菜中,诸如Django之类的Web框架可以快速响应用户并允许该任务在后台运行,从而确保用户不在等待。
使用单个队列可能足以满足较小的应用程序,随着项目的增长和任务的变化,您可能会发现需要优先考虑某些任务或在专业工人上运行它们。在本文中,我们将研究如何在本地开发和部署到生产时与Django一起使用多个队列。
- 1. Integrating Celery with Django
- 2. Local Development with Django and Celery
- 3. Production Deployment with Django and Celery
- 4. Multiple Celery Queues Example App
集成芹菜与django
我们首先需要设置我们的Django项目以与芹菜合作,然后再添加多个队列。
将芹菜添加到Django的第一步是确保安装了必要的依赖项。我们可以在命令行上安装它们,但是我们将使用 sumplions.txt 文件使过程可重复。
Django==4.1.7
celery==5.2.7
redis==4.5.5
gunicorn==20.1.0
现在,在包含您 settings.py 文件的同一目录中创建一个名为 celery.py 的文件。该文件将充当每个芹菜工人的入口点。配置设置在此处加载,并告知工人在哪里找到任务定义。 celery.py 文件也可以选择包含计划的任务配置。
import os
from datetime import timedelta
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery("myapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
app.conf.beat_schedule = {
"run-every-10-seconds": {
"task": "myapp.tasks.foo",
"schedule": timedelta(seconds=10),
},
"run-every-1800-seconds": {
"task": "myapp.tasks.bar",
"schedule": timedelta(seconds=1800),
},
}
最后,我们需要对设置进行少量更新。代理URL将包含一个指向REDIS实例的连接字符串。此REDIS实例将负责管理包含待处理任务数据的队列。
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL")
使用环境变量将允许本地和生产部署无需更改代码。
随着最后的调整,我们的Django项目几乎应该准备与芹菜一起工作。接下来,我们将查看如何容忍此设置并使用Docker在本地运行。
local开发与Django和芹菜
我们可以使用Docker并共同组成,以使Django和芹菜更加容易地使本地开发。此方法还使与其他开发人员在项目上进行协作变得更加容易。
第一步是将DJANGO和芹菜工作过程集合化。为此,我们将编写一个 dockerfile ,其中包含具有必要依赖性图像的说明。
FROM python:3.9-slim-buster
ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1
RUN apt-get update && apt-get install -y \
libpq-dev \
&& apt-get -y clean
WORKDIR /usr/src/app
COPY ./requirements.txt /usr/src/app
RUN pip install --upgrade pip && pip install -r requirements.txt
COPY . /usr/src/app/
如果您熟悉 dockerfile 格式,则可能会注意到最后没有 cmd 参数。这是有意的,因为我们将使用 docker-compose 稍后指定命令。这将允许一个 dockerfile 为django和芹菜容器都起作用。
使用Docker组成以管理容器
这是一个简单的 docker-compose.yml 文件,它将配置Django Dev Server,Celery Worker和Redis实例。我们在这里使用开发服务器,因为我们想利用热重加载功能。生产部署将使用枪支为应用程序服务。
version: '3.8'
services:
web:
build:
context: .
dockerfile: Dockerfile
container_name: django_dev
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ${PWD}:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
ports:
- "8000:8000"
depends_on:
- redis
celery:
build:
context: .
dockerfile: Dockerfile
container_name: celery_worker
command: celery -A myapp worker --loglevel=info
volumes:
- ${PWD}:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
- redis
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
启动和运行此容器集合只需要一个命令,该命令还将构建图像并下载任何远程图像,例如Redis容器。
docker-compose up -d
要完成一个简单的芹菜任务
现在我们已经运行所有容器,我们应该花一点时间来实现概念验证任务。
我们将向 protem_queues_app/目录添加一个名为 tasks.py 的新文件,并实现一个简单的异步任务。该任务将计算fibonacci编号,尽管以非常效率的递归方式。
from celery import shared_task
@shared_task
def fibonacci(n):
if n <= 1:
return n
else:
return fibonacci(n - 1) + fibonacci(n - 2)
现在,我们需要一种观点,可以调用以调用此任务并在后台进行处理。在 proture_queues_app/views.py 文件中添加以下代码。
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from .tasks import fibonacci
@csrf_exempt
def calculate_fibonacci(request):
if request.method == "POST":
try:
N = int(request.POST.get("N"))
result = fibonacci.delay(N)
return JsonResponse(
{"task_id": result.task_id, "status": "Task submitted successfully!"},
status=200,
)
except ValueError:
return JsonResponse({"error": "Invalid N value provided."}, status=400)
else:
return JsonResponse({"error": "Only POST method is allowed."}, status=405)
当然,我们几乎在那里,但是我们需要在应用程序中添加 urls.py ,并更新主要项目 urls.py 此观点将作出回应。
from django.urls import path
from . import views
urlpatterns = [
path("fibonacci/", views.calculate_fibonacci, name="calculate_fibonacci"),
]
from django.contrib import admin
from django.urls import include, path
urlpatterns = [
path("admin/", admin.site.urls),
path("", include("multiple_queues_app.urls")),
]
有了视图和任务,我们现在可以尝试所有这些。第一个问题a docker-compose重新启动,以便更新芹菜工人代码。现在,我们可以使用我们选择的任何工具来测试视图,但是我将使用卷曲来提出请求。
curl -X POST -d "N=25" http://localhost:8000/fibonacci/
观看芹菜工人的日志,我们应该看到该任务被重新加入,并需要3秒钟才能执行。
使用芹菜工人热装的看门狗
您可能已经注意到,我们必须在尝试新任务之前手动重新启动芹菜工人。手动重新启动工人将很快变得笨拙。理想情况下,我们可以具有类似于Django Dev服务器中的热重载的功能。
幸运的是,看门狗实用程序提供了一种简单的方法,可以在目录更改时重新启动过程。让我们修改 docker-compose.yml 文件以使用 watchMedo watchdog 实用程序的脚本,在芹菜的命令部分中。
celery:
build:
context: .
dockerfile: Dockerfile
container_name: celery_worker
command: watchmedo auto-restart --directory=multiple_queues_app --pattern=*.py --recursive -- celery -A myapp worker --loglevel=info
volumes:
- ${PWD}:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
- redis
但是,在更新容器之前,我们首先需要添加一个 insuert-dev.txt 文件并修改 dockerfile 以从中安装。
FROM python:3.9-slim-buster
ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1
RUN apt-get update && apt-get install -y \
libpq-dev \
&& apt-get -y clean
WORKDIR /usr/src/app
COPY ./requirements.txt /usr/src/app
COPY ./requirements-dev.txt /usr/src/app
RUN pip install --upgrade pip && pip install -r requirements.txt && pip install -r requirements-dev.txt
COPY . /usr/src/app/
通过重建和重新启动容器,您现在应该能够修改Python文件,并自动查看芹菜工人。
在本地开发中添加多个队列
如何更新此本地开发设置以包括多个芹菜队列?
该过程就像在 docker-compose.yml 文件中添加另一个服务一样简单。但是,这里的问题在于,您的撰写文件可能会开始长时间重复。我们可以使用一些模板魔术来保持干燥。
,我们可以不断地复制整个芹菜服务块。以下是更新的 docker-compose.yml ,添加了一个新的芹菜工人。
version: '3.8'
x-worker-opts: &worker-opts
build:
context: .
dockerfile: Dockerfile
volumes:
- ${PWD}:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on:
- redis
services:
web:
build:
context: .
dockerfile: Dockerfile
container_name: django_dev
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ${PWD}:/usr/src/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
ports:
- "8000:8000"
depends_on:
- redis
fibonacci-worker:
command: tools/start_celery.sh -Q fibonacci --concurrency=1
<<: *worker-opts
prime-worker:
command: tools/start_celery.sh -Q prime --concurrency=1
<<: *worker-opts
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
我们还需要调整Python任务代码。以前,所有任务都被发送给处理所有工作的单个工人。现在,我们将工作分为单独的队列,我们需要指定哪些队列应处理任务。
from celery import shared_task
@shared_task(queue="fibonacci")
def fibonacci(n):
if n <= 1:
return n
else:
return fibonacci(n - 1) + fibonacci(n - 2)
@shared_task(queue="prime")
def nth_prime(n):
count = 0
num = 1
while count < n:
num += 1
if num < 2:
continue
if any(num % i == 0 for i in range(2, int(num**0.5) + 1)):
continue
count += 1
return num
与Django和芹菜一起部署
有很多方法可以将芹菜工人部署到生产环境中,但是在本文中,我将重点介绍一种相对简单的技术。在本文中,我们将通过GitHub动作和Ansible组合进行部署的过程。
Ansible是一个可靠且经过时间测试的框架,用于将配置应用于SSH的远程计算机。用户定义 playbooks 在远程计算机上配置应用程序和服务。 Ansible采用这些剧本,并检查目标机与所需的配置匹配,安装和更新使机器最新的必要内容。
您可能会问为什么不使用Docker并为生产服务器组成?这可能是一个合法的选择,但是像Ansible这样的工具使您可以通过多台计算机部署应用程序。使用Docker做同样的事情很可能意味着使用Kubernetes或Nomad等工具,这大大提高了复杂性水平。
由于Ansible通过SSH起作用,我们首先需要授予对目标机器的访问。
在真实的生产方案中,您可能需要使用具有已配置的特定可抗用户的虚拟机映像。出于测试目的,我只是使用根用户开始了数字海洋液滴。我们将在本地生成一个新的SSH密钥对,然后将公共密钥复制到目标计算机。私钥将在github上作为秘密。
ssh-keygen -f ansible-key
ssh-copy-id -i ansible-key.pub root@24.199.126.163
您当然应该将IP地址更改为您控制的机器的地址。 ssh-copy-id 程序假设您的本地用户可以使用有效的SSH键或密码登录。这里的目的是设置一个新生成的密钥对,只有Ansible才能使用。
现在,公共密钥已上传到目标机器,我们将在github上存储私钥,以便Ansible在操作管道内运行时可以登录。从上面的命令中,我们将密钥对命名为 ansible-key 和 ansible-key.pub - 我们需要上传私人(即non .pub)键到github。
使用GitHub操作工作流程以触发Ansible
在潜入Ansible本身之前,我们需要一些东西来触发和执行剧本。 GitHub动作是为此目的的便捷工具。对于这个简单的教程,我们将在每次推动下运行Ansible Playbook to Main 分支。更健壮的设置可以使用分支名称来确定是部署到分期环境还是生产。
目前,这是一个简单的 decloy.yml ,可以启动Ansible Playbook。它还演示了如何将github的秘密传递到Ansible。
name: Deploy
on:
push:
branches:
- main
jobs:
update_celery:
runs-on: ubuntu-latest
environment: production
container: willhallonline/ansible:alpine
steps:
- name: Checkout repository
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up SSH key
working-directory: ansible
env:
ANSIBLE_SSH_PRIVATE_KEY: ${{ secrets.ANSIBLE_SSH_PRIVATE_KEY }}
run: |
echo "$ANSIBLE_SSH_PRIVATE_KEY" > id_rsa
chmod 600 id_rsa
- name: Run Ansible playbook
working-directory: ansible
run: ansible-playbook -i inventory.ini playbook.yml --tags "app,redis,celery" --extra-vars "redis_password=${{ secrets.REDIS_PASSWORD }}"
使用Systemd管理芹菜工人
在我们的Ansible Playbook中,我们将有一些任务负责设置芹菜工人并保持代码更新。我发现创建一个 Ansible 目录以存储所有这些配置很有用。
这是目录结构的概述,以了解与我们项目中的Ansible有关的内容。
ansible
├── ansible.cfg
├── inventory.ini
├── playbook.yml
└── roles
├── app
│ ├── tasks
│ │ └── main.yml
│ └── templates
│ ├── gunicorn.conf.py.j2
│ ├── gunicorn-run-dir.service.j2
│ └── gunicorn.service.j2
├── celery
│ ├── tasks
│ │ └── main.yml
│ └── templates
│ ├── celery-beat.service.j2
│ └── celery.service.j2
└── redis
└── tasks
└── main.yml
有两个主要任务,其中一项是进行了初始设置和Redis的安装,另一个是为了配置芹菜工人。由于这种方法仅将Docker用于本地开发,因此我们将使用 Systemd 服务来编排芹菜。如果您从未使用过 SystemD ,请放心,它不会太复杂。使用 SystemD 是确保在VM重新启动或工人死亡的情况下恢复芹菜的一种简单方法。
我们将为每个工人提供一个单独的 SystemD 服务,用于 celery-beat 实例,用于计划任务。我不会显示两个配置文件,因为它们非常相似,但是下面是芹菜工作人员的 Systemd 单元文件。
[Unit]
Description=Celery Worker {{ item.name }} for Multiple Queues App
After=network.target
[Service]
Environment="CELERY_BROKER_URL=redis://:{{ redis_password }}@localhost:6379/0"
Environment="PYTHONUNBUFFERED=1"
Type=simple
User=root
Group=root
WorkingDirectory={{ app_path }}
ExecStart={{ app_path }}/.venv/bin/celery -A multiple_queues_app worker -Q {{ item.name }} --loglevel=info
Restart=always
RestartSec=10
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=celery-{{ item.name }}
[Install]
WantedBy=multi-user.target
您会注意到该模板包含Jinja标签。这将使我们能够为每个单独的芹菜队列编写一个单元文件,在我们扩大队列数量时将凌乱的重复保持在最低限度。
在剧本级别上定义一个新的芹菜队列。
- hosts: production
become: yes
vars:
app_repo: git@github.com:zchtodd/multiple_queues_app.git
app_path: /opt/multiple_queues_app
venv_path: /opt/multiple_queues_app/.venv
gunicorn_config_path: /etc/systemd/system/gunicorn.service
run_dir_maker_config_path: /etc/systemd/system/gunicorn-run-dir.service
celery_workers:
- name: fibonacci
要创建一种新类型的队列,您可以将名称添加到 celery_workers list和Ansible将通过模板系统来处理配置文件。
脱离芹菜与Ansible
下一步是使用Ansible来创建芹菜工人服务。这个过程非常简单,只是涉及写出我们之前看到的 SystemD 配置文件。单独的Ansible角色处理克隆应用程序代码并启动Web服务器进程。您可以在project GitHub repository中找到所有细节。
- name: Create Celery logs directories
ansible.builtin.file:
path: "/var/log/celery/{{ item.name }}"
state: directory
loop: "{{ celery_workers }}"
- name: Create Celery Beat Systemd service file
ansible.builtin.template:
src: "{{ playbook_dir }}/roles/celery/templates/celery-beat.service.j2"
dest: "/etc/systemd/system/celery-beat.service"
mode: "0644"
- name: Create Celery Systemd service files
ansible.builtin.template:
src: "{{ playbook_dir }}/roles/celery/templates/celery.service.j2"
dest: "/etc/systemd/system/celery-{{ item.name }}.service"
mode: "0644"
loop: "{{ celery_workers }}"
- name: Reload Systemd configuration
ansible.builtin.systemd:
daemon_reload: yes
- name: Enable and start Celery services
ansible.builtin.systemd:
name: "celery-{{ item.name }}"
state: started
enabled: yes
loop: "{{ celery_workers }}"
- name: Enable and start Celery beat
ansible.builtin.systemd:
name: "celery-beat"
state: started
enabled: yes
- name: Restart Celery service
ansible.builtin.systemd:
name: "celery-{{ item.name }}"
state: restarted
daemon_reload: yes
loop: "{{ celery_workers }}"
- name: Restart Celery beat
ansible.builtin.systemd:
name: "celery-beat"
state: restarted
daemon_reload: yes
一旦建立了芹菜工人,您就可以使用 Journal 和 Systemctl 来监视和管理流程。
例如,要拖延斐波那契工人的日志,您将执行以下命令。
journalctl -u celery-fibonacci.service -f
Multiple芹菜队列示例应用程序
本教程并未显示所有代码,因此,如果您想查看完整的实现,example app is hosted on GitHub.
完整的应用程序应该是一个可以扩展的完整模板。
快乐黑客!