使用Django,芹菜和RabbitMQ了解任务队列
#python #django #celery #rabbitmq

概括

  • 介绍£或任务队列
  • 了解什么是芹菜和兔子
  • django的案例研究
  • 使用DRF创建API,而无需任务队列
  • Usando task queue na API
  • 参考

任务队列简介

当我们创建项目时,某些任务需要一些(或太多)进行处理,一个例子是一生的报告和对某些API的访问,而某些API可能会因航空河流而下降,例如当访问政府的API以检查某人的CPF时,以验证承载者是否有任何法律问题。通过这种方式,用户必须不等待完成某些任务,以从发生或可能发生的事情中获得反馈。因此,任务队列或也称为任务行,是推迟某些任务的一种方式,因此这些任务将以异步的方式进行,以相对于应用程序的流量£O。

来完成。

taskqueue

在上面的图像中,我们看到了这个概念的工作原理,因此有一个生产者(生产者)创建要处理的任务,这些任务存储在经纪人的队列中,后来将这些tassks在消费者中消费(消费者)要处理的是,他们可以或不对数据库进行呼叫。

了解什么是芹菜和兔子

o 芹菜 是一种python应用>,这依赖于A 消息代理的存在 工作。芹菜消费者被称为 工人 ,他们一直在处理他们总是通过新 任务向经纪人提出要求 被处理。 兔子 是支持的 Brokers ,其目的是存储某些工作将被工作所消费的任务。与芹菜一起,可以创建一个优先级的最后一行,以便将某些任务处理在他人面前。生产商和经纪人之间的通信协议称为AMQP。

Django的案例研究

更好地了解芹菜任务队列如何用于问题,请想象我们有一个管理服装店的应用程序,我们可以在其中管理销售,我们拥有的终点之一就是在XLSX中生成报告(Excel )我们的销售格式。因此我们可以拥有以下图表实体关系:

models
总而言之,在上面的图表实体关系中,我们有一个推销员进行大量销售和销售,有很多衣服。

使用无任务队列的DRF创建API

依赖

首先,创建一个虚拟环境以安装应用程序的依赖性。在虚拟环境中安装以下依赖性:

pip install django djangorestframework model_bakery openpyxl

我们安装了 django django rest框架 使我们的应用程序的创建£o;我们还安装了 型号面包店 openpyxml ,第一个为我们进行某些假货,以便我们进行我们的测试,第二个创建我们的报告。

在虚拟环境文件夹中键入命令以创建一个项目和我们的应用程序:

django-admin startproject store_app .
django-admin startapp core

楷模

让我们映射模型的图表

from django.db import models
from django.utils import timezone
from django.core.exceptions import ValidationError
# Create your models here.

class Seller(models.Model):
    name = models.CharField(max_length=30)

    def __str__(self) -> str:
        return self.name

class Sale(models.Model):
    total = models.FloatField(default=0)
    created_at = models.DateTimeField(default=timezone.now)

    saller = models.ForeignKey(Seller, on_delete=models.DO_NOTHING)

SIZE_CLOTHES = (
    ('p','P'),
    ('m','M'),
    ('g','G')
)

class Clothe(models.Model):
    name = models.CharField(max_length=50)
    size = models.CharField(max_length=1, choices=SIZE_CLOTHES)
    amount = models.PositiveIntegerField()

    def __str__(self) -> str:
        return f"{self.name}({self.size})"

class ClotheItem(models.Model):
    price_unit = models.FloatField()
    amount_items = models.PositiveIntegerField()

    sale = models.ForeignKey(Sale, on_delete=models.CASCADE, related_name="items", null=False)
    clothe = models.ForeignKey(Clothe, on_delete=models.CASCADE, null=False)
    def clean(self) -> None:
        if self.amount_items < self.clothe.amount:
            raise ValidationError("Amount Item more than Amount Clothes ")
        return super(ClotheItem, self).clean()

    def __str__(self) -> str:
        return f"{self.clothe}"

创建迁移和银行:

python manage.py makemigrations
python manage.py migrate

任务

我们的目标是创建一条路由,该路由生成有关我们商店销售的报告,该报告可以由Excel读取,我们将在Applied Application 任务中创建一个文件。PY < < /strong>包含一个有趣的功能

from openpyxl import Workbook
from core.models import Sale

def create_report_of_sale():
    wb = Workbook()
    ws = wb.active
    ws.append(["ID", "Criado Em", "Total","Vendedor","Roupas", "Quantidade de Roupas"])

    sales = Sale.objects.prefetch_related("items")

    for sale in sales:
        date = sale.created_at.strftime('%Y-%m-%d')
        items = []
        for item in sale.items.all():
            items.append(item.clothe.name)

        ws.append([sale.pk,date,sale.total, sale.saller.name, str(items), len(items)])

    wb.save("store_sales.xlsx")

以上仅创建一个包含一些销售数据的表的store_sales.xlsx文件。

URL

from django.contrib import admin
from django.urls import path
from core.views import CreateReportToSale

urlpatterns = [
    path('admin/', admin.site.urls),
    path('report/sales/', CreateReportToSale.as_view()),
]

视图

让我们创建以管理路线的视图,添加以下内容:

from rest_framework.views import APIView
from core.tasks import create_report_of_sale
from rest_framework.response import Response
import time
# Create your views here.
class CreateReportToSale(APIView):

    def get(self, request):
        tb = time.time()
        create_report_of_sale()
        ta = time.time()
        print(f"Tempo Processando:{ta-tb}s")
        return Response(data={"msg":f"Relatório criado com sucesso !"},status=200)

上面的创建创建了一种视图,可以使用所有HTTP获取并创建我们的报告来管理路线。我们添加了一个打印,以知道花时间。

实现它

对于我们生成报告,我们需要数据,让我们使用库 model_bakery。

python manage.py shell

内部执行以下命令:

>>> from model_bakery import baker
>>> baker.make("core.ClotheItem", _quantity=3000)

以上将创建3000件衣服,3000件衣服和3000个销售,因为我们的型号 clotheitem 与这些型号有关,我们有 null = false。 代码用于模拟3000销售的创建,每件衣服。

然后输入报告的创建路线,并运行项目,让我们看看有一个延迟要处理,我们还可以看到我们去终端时花费时间的时间。

使用任务队列NA API

依赖

这是一个时间需要花费时间来处理的示例,我们还有其他示例,例如访问API的访问权限,发送电子邮件等。因此,要解决我们的问题,我们将使用 任务队列 。首先让我们安装芹菜库。

pip install celery

后来,我们必须使用以下Docker命令来攀登我们的消息经纪(RabbitMQ):

docker run -d -p 5672:5672 rabbitmq

芹菜

有了这两个依赖,我们可以继续进行项目。首先创建一个称为 celery.py 的文件

from celery import Celery
import django
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'store_app.settings')
django.setup()
app = Celery("store_app", broker='pyamqp://guest@localhost//')

app.autodiscover_tasks()

以上是使用settings.py文件的配置AMQP协议。

以下行是为了使所有放置在settings.py上的应用程序都可以使用此实例。

我们应该做的另一种配置是在项目的初始。pyâ,以便在Django初始化项目时初始化芹菜的实例,因此我们的应用程序可以从函数中访问 shared_task

from celery import app

__all__ = ('app',)

任务

tasks.py file 中,我们可以修改以下方式:

from openpyxl import Workbook
from core.models import Sale
from celery import shared_task #<-new
@shared_task(
        bind=True,
        max_retries=5,
        default_retry_delay=30) #<-new
def create_report_of_sale(self): #<-new
    try: #<-new
        wb = Workbook()
        ws = wb.active
        ws.append(["ID", "Criado Em", "Total","Vendedor","Roupas", "Quantidade de Roupas"])

        sales = Sale.objects.prefetch_related("items")

        for sale in sales:
            date = sale.created_at.strftime('%Y-%m-%d')
            items = []
            for item in sale.items.all():
                items.append(item.clothe.name)

            ws.append([sale.pk,date,sale.total, sale.saller.name, str(items), len(items)])

        wb.save("sample.xlsx")
    except: #<-new
        raise self.retry() #<-new

在上面包含的我们正在使用Decorator shared_task, 使用此装饰器我们正在创建一个可以通过任何代码调用的新任务,我们仍在给他一些论点,例如:

  • bind - 使我们能够访问特定任务的某些属性,例如请求对象或重试函数,这使我们能够如果某事走了,可以再次执行任务错误;
  • max_retries - 添加了许多动议的会计师试图回到重试;
  • default_retry_dely_dely - 一个呼叫重试和另一个呼叫之间的时间。 ###查看

在我们看来,我们可以修改以下方式:

from rest_framework.views import APIView
from core.tasks import create_report_of_sale
from rest_framework.response import Response
import time
# Create your views here.
class CreateReportToSale(APIView):

    def get(self, request):
        tb = time.time()
        create_report_of_sale.delay() #<-new
        ta = time.time()
        print(f"Tempo de Processamento:{ta-tb}")
        return Response(data={"msg":f"Relatório criado com sucesso !"},status=200)

有了对母亲的简单呼吁,可能会发生,但为此,创建我们的任务消费者很重要,输入另一个终端并添加以下内容:

celery -A store_app worker --loglevel=INFO

注意:请记住,我们的过程工作没有热行,即每次修改任务运行以上时。

输入您的项目并访问我们报告的创建路线。正如我们可以看到的那样,事情发生了更加勇敢,因为当我们访问路线时,我们创建了一个添加到经纪人并被可展示的工人消费的任务,而将任务添加到经纪人的时间比处理要多得多。它。

repositã³rio:
Código Desenvolvido

反思

https://docs.celeryq.dev/en/stable/index.html

https://denibertovic.com/posts/celery-best-practices/

https://www.django-rest-framework.org

https://docs.djangoproject.com/en/4.1/