Django的工作队列和工人
#python #django #celery

本文最初是由Muhammed AliHoneybadger Developer Blog上撰写的。

构建Django应用程序时,将有一段时间您想与某些功能异步或自动运行一些功能。例如,您可能需要运行一个脚本,每次登录时都会向用户发送电子邮件。手动执行此操作是无效的。此外,也许您想每天执行计算。每天手动运行代码是没有意义的;如果您生病或太忙会发生什么?

您知道如何在使用Django应用程序时解决此类问题吗?如果没有,请坚持并遵循本文提供的教程。本文的存储库可以在GitHub上找到。

先决条件

要遵循本教程,需要以下内容:

  • 对Django的基本理解

什么是工作队列和工人?

在到达排队是什么之前,让我们首先了解工作是什么。作业是要在特定时间点执行的任务/功能/类设置。可以根据需要顺序运行多个作业;这些称为工作队列。

我们可以将工人视为一次可用于处理一项作业的服务器或处理器。如果我们有4名工人,则意味着可以一次处理4个工作/职能。工作过程从工作/任务队列中删除任务。

Django应用程序中的用例工作队列和工人

以下是一些用例,在构建Django应用程序时使用工作队列和工人是有意义的。

  1. 假设您已经设计了一个系统来关注传入的文件,例如潜在员工的réum -sumâs。在有人提交Râ©Sumâ©之后,该系统会提醒您组织的人力资源部门。这是工作队列和工人的很好的用例。
  2. 假设您正在建立一个电子商务网站。您将需要一个自动化系统,该系统在运输产品时向用户发送电子邮件。您可以设置一个检测处理程序何时指示产品的作业,然后将自动化的电子邮件发送给特定用户。没有设置工作队列,产品的处理程序将必须直接发送电子邮件,这是效率低下的。
  3. 开发人员可以使用工作队列和工人来设置自动警报并为即将到来的数据库漏洞建立动态解决方案。
  4. 价格跟踪器将需要一个系统,该系统每天(或您喜欢的任何时间表)获取数据,并向用户发送有关跟踪商品的消息。我们可以自动化此过程,并防止用户在想跟踪价格时必须积极运行代码。
  5. 卸载任务需要很长时间才能运行。

建立基础项目

在本节中,我们将设置一个带有view的简单Django应用程序,该应用程序将view中的其他操作从运行中延迟。使用此应用程序,我将说明工作队列和工人如何卸载view花费太长时间的view

通过卸载上述view,其他views可以正确运行而无需延长等待时间。

首先使用以下命令安装django:

$ pip install django

接下来,运行下面的命令开始一个新的Django项目:

$ django-admin startproject project .

然后,运行下面的命令以创建应用程序:

$ python3 manage.py startapp sleep_app

接下来,将您刚创建的应用程序添加到settings.py文件:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'sleep_app' #new
]

现在,在sleep_app的views.py中,粘贴以下代码:

from django.http import JsonResponse
from time import sleep

def slow_view(request):
    data = {
        "name": "Muhammed Ali"
    }
    sleep(7)
    return JsonResponse(data)

粘贴以下代码以更新项目的urls.py文件以在浏览器中显示视图。

from django.contrib import admin
from django.urls import path
from sleep_app.views import slow_view

urlpatterns = [
    path('admin/', admin.site.urls),
    path("sleep/", slow_view),

]

现在,当您使用python manage.py runserver进行项目运行时,并在浏览器上打开http://127.0.0.1:8000/sleep/时,您会发现需要7分钟才能看到网站的内容。这表明应该卸载缓慢的过程,以便用户可以尽快访问站点内容。

如何为Django应用程序设置工作队列和工人

在本节中,您将学习如何使用Django Q和芹菜来设置工作队列和工人来卸载我们上面创建的慢速过程。

如何使用Django Q

Django Q是一种用于管理Django应用程序中的工作/任务队列和工人的应用程序。它通过使用Python多处理来做到这一点。

要设置作业队列,您将需要一个消息代理,该系统将发件人的正式消息协议(Django)转换为接收器的正式消息协议(Django Q)。我们将在本文中使用的消息经纪人是Redis,但是您可以选择others。 REDIS将获取有关要执行的功能的信息,并将其放入Django Q的队列中。

我们将使用Heroku提供的REDIS服务作为我们的消息经纪。

在Django项目中设置Heroku Redis

在此步骤中,请确保您已经在本地计算机中安装了Heroku CLI并在Heroku上创建了一个帐户。然后,转到您的终端,导航到项目的根,并运行以下命令。

git init
heroku create --region eu --addons=heroku-redis

第一个命令初始化了该项目的git仓库,以便我们的项目可以连接到Heroku,第二个命令使用REDIS附加组件在Heroku上创建了一个新应用。 --region标志指定您要创建应用程序的region

接下来,使用pip install django-q redis==3.5.3安装django q和redis,然后将"django_q"添加到django设置文件中的INSTALLED_APPS

然后,运行heroku config,Heroku将提示REDIS凭据。我们将稍后需要REDIS_URL

settings.py文件中,粘贴以下代码以初始化django q的设置。

Q_CLUSTER = {
    'name': 'myproject',
    'workers': 4, # number of workers. It Defaults to number of CPU of your local machine.
    'recycle': 500,
    'timeout': 60,
    'compress': True,
    'save_limit': 250,
    'queue_limit': 500,
    'cpu_affinity': 1,
    'label': 'Django Q',
    'redis': {
        'host': 'ec2-54-171-19-145.eu-west-1.compute.amazonaws.com', #Gotten from REDIS_URL
        'password': 'p78f7b20091d2710e27400dda07d097aff472f7d6a5947bd3232683d268fa4c0f',  #Gotten from REDIS_URL
        'port': 31449, #Gotten from REDIS_URL
        'db': 0, }
}

Redis URL for Q

使用您的REDIS_URL详细信息更新从REDIS_URL检索的代码部分。

注意:出于安全目的,在现实生活项目中,不要将所有凭据留在项目文件中。

您可以在documentation中了解有关其他配置设置的更多信息。

接下来,运行python manage.py migrate,以便django q可以应用其迁移。

将睡眠功能卸载到Django Q

要卸载sleep(7)功能,我们将需要导入async_task功能。它需要3个argument;

  1. 您要卸载的功能,在这种情况下为sleep()
  2. 您要传递到卸载功能的arguments,在这种情况下为10
  3. 您要在工作队列中执行作业后要运行的功能为hook();此函数以task作为参数。

将下面的代码粘贴到您的views.py文件中。

from django.http import JsonResponse
from django_q.tasks import async_task

def Slow_view(request):
    data = {
        "name": "Muhammed Ali"
    }
    async_task("sleep_app.q_services.py.sleepy_func", 7, hook="sleep_app.q_services.py.hook_funcs")
    return JsonResponse(data)

接下来,在sleep_app文件夹中,创建一个名为q_services.py的新文件(文件名是任意),该文件将包含async_task()所要求的所有功能。

q_services.py中,粘贴以下代码。

from time import sleep

def hook_funcs(task):
    print("The task result is: ", task.result)

def sleepy_func(sec):
    # This function will be taken to the job queue
    sleep(sec)
    print ("sleepy function ran")

接下来,请确保您的项目正在运行,然后在另一个终端运行python manage.py qcluster,以将Django Q cluster旋转用于您的应用程序。

打开[http://127.0.0.1:8000/sleep](http://127.0.0.1:8000/sleep),您会看到页面立即出现,表明django q。

正在处理sleep(7)

在django服务器上,您会看到一项工作已被晋升并正在由工人处理。

Django enqueue

然后,在7秒后,在Django Q群集上,您应该看到以下图像,显示作业已完成。

Django Q cluster

如何使用芹菜

Celery是一个分布式任务队列,可让您将作业发送到队列以执行。芹菜为注意在作业队列执行的新任务提供专门的工人。

像django Q一样,我们还需要一个消息代理才能正常工作。我们将需要在Heroku上创建另一个应用程序并生成新的凭据。

为了说明这一点,我将像以前一样创建另一个sleep_app

$ python3 manage.py startapp sleep_app2

接下来,将"sleep_app2"添加到INSTALLED_APPS

然后,在sleep_app2views.py中,粘贴以下代码:

from django.http import JsonResponse
from time import sleep

def slow_view2(request):
    data = {
        "name": "Muhammed Ali"
    }
    sleep(7)
    return JsonResponse(data)

粘贴以下代码以更新项目的urls.py文件以在浏览器中显示视图。

...
from sleep_app.views2 import slow_view2 #new

urlpatterns = [
...
    path("sleep2/", slow_view2), #new
]

首先,通过运行pip install celery安装芹菜。

接下来,在您的项目文件夹中,创建一个带有名称celery.py的文件并粘贴以下代码:

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings') #path to your settings file

app = Celery('slow_view2')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

然后,在project/__init__.py中,粘贴以下代码,以导入我们刚刚创建为django的芹菜应用程序。

from .celery import app as celery_app

__all__ = ('celery_app',)

通过运行heroku create --region eu --addons=heroku-redis <name-of-project>在Heroku中创建另一个项目,然后运行heroku config -a <name-of-project>以获取您的REDIS_URL

Image of Redis URL

接下来,转到您的settings.py文件并将以下代码粘贴到文件的按钮上。

CELERY_BROKER_URL = 'redis://:p056c31e91f51f95aff5f976758425ff651afe815e3f8f14fd00dfca932b679bb@ec2-34-252-112-18.eu-west-1.compute.amazonaws.com:24059' #the REDIS_URL you just got

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERRIALIZER = 'json'

sleep_app2文件夹中,创建一个带有名称celery_services.py(文件名是任意)并粘贴以下代码的新文件。

from celery import shared_task #used to create task 
from time import sleep

@shared_task
def sleepy_func(sec):
    # This function will be taken to the job queue
    sleep(sec)

接下来,使用以下代码更新views.py

from django.http import JsonResponse
from .celery_services import sleepy_func

def slow_view2(request):
    data = {
        "name": "Muhammed Ali"
    }
    sleepy_func.delay(7)
    return JsonResponse(data)

然后,运行您的django服务器,在另一个终端窗口上,使用celery -A project worker -l info运行芹菜服务器。

打开sleep_app2的URL,您会发现页面出现更快。最后,在7秒后转到芹菜服务器,您会发现我们的工作已成功执行。

Celery Server

生产的工作队列

工作队列在生产工作时的工作方式有所不同。这是因为根据您用于部署的平台,在云上运行作业队列服务器的一些额外设置和配置。

如果您试图用芹菜部署Django项目到Heroku,Heroku为此提供了solution

除了上面的选项外,一些云服务提供商提供服务服务的服务。例如,AWS具有AWS Batch,Heroku具有Heroku Scheduler。您可以在云上工作时研究这些。

结论

本文解释了工作队列和工人的含义以及他们的工作方式和功能。我们还构建了一个简单的应用程序,该应用程序延迟了网页的显示,以提供一个实用的示例,说明了工作队列的工作方式。

我们使用Django Q和芹菜从视图中删除缓慢的过程,因此它可以在背景中工作并不能放慢视图。

希望有了这些信息,您将开始考虑这些技术以使您的项目尽可能快地运行。