概括
- 介绍£或任务队列
- 了解什么是芹菜和兔子
- django的案例研究
- 使用DRF创建API,而无需任务队列
- Usando task queue na API
- 参考
任务队列简介
当我们创建项目时,某些任务需要一些(或太多)进行处理,一个例子是一生的报告和对某些API的访问,而某些API可能会因航空河流而下降,例如当访问政府的API以检查某人的CPF时,以验证承载者是否有任何法律问题。通过这种方式,用户必须不等待完成某些任务,以从发生或可能发生的事情中获得反馈。因此,任务队列或也称为任务行,是推迟某些任务的一种方式,因此这些任务将以异步的方式进行,以相对于应用程序的流量£O。
来完成。在上面的图像中,我们看到了这个概念的工作原理,因此有一个生产者(生产者)创建要处理的任务,这些任务存储在经纪人的队列中,后来将这些tassks在消费者中消费(消费者)要处理的是,他们可以或不对数据库进行呼叫。
了解什么是芹菜和兔子
o 芹菜 是一种python应用>,这依赖于A 消息代理的存在 工作。芹菜消费者被称为 工人 ,他们一直在处理他们总是通过新 任务向经纪人提出要求 被处理。 兔子 是支持的 Brokers ,其目的是存储某些工作将被工作所消费的任务。与芹菜一起,可以创建一个优先级的最后一行,以便将某些任务处理在他人面前。生产商和经纪人之间的通信协议称为AMQP。
Django的案例研究
更好地了解芹菜任务队列如何用于问题,请想象我们有一个管理服装店的应用程序,我们可以在其中管理销售,我们拥有的终点之一就是在XLSX中生成报告(Excel )我们的销售格式。因此我们可以拥有以下图表实体关系:
总而言之,在上面的图表实体关系中,我们有一个推销员进行大量销售和销售,有很多衣服。
使用无任务队列的DRF创建API
依赖
首先,创建一个虚拟环境以安装应用程序的依赖性。在虚拟环境中安装以下依赖性:
pip install django djangorestframework model_bakery openpyxl
我们安装了 django 和 django rest框架 使我们的应用程序的创建£o;我们还安装了
在虚拟环境文件夹中键入命令以创建一个项目和我们的应用程序:
django-admin startproject store_app .
django-admin startapp core
楷模
让我们映射模型的图表
from django.db import models
from django.utils import timezone
from django.core.exceptions import ValidationError
# Create your models here.
class Seller(models.Model):
name = models.CharField(max_length=30)
def __str__(self) -> str:
return self.name
class Sale(models.Model):
total = models.FloatField(default=0)
created_at = models.DateTimeField(default=timezone.now)
saller = models.ForeignKey(Seller, on_delete=models.DO_NOTHING)
SIZE_CLOTHES = (
('p','P'),
('m','M'),
('g','G')
)
class Clothe(models.Model):
name = models.CharField(max_length=50)
size = models.CharField(max_length=1, choices=SIZE_CLOTHES)
amount = models.PositiveIntegerField()
def __str__(self) -> str:
return f"{self.name}({self.size})"
class ClotheItem(models.Model):
price_unit = models.FloatField()
amount_items = models.PositiveIntegerField()
sale = models.ForeignKey(Sale, on_delete=models.CASCADE, related_name="items", null=False)
clothe = models.ForeignKey(Clothe, on_delete=models.CASCADE, null=False)
def clean(self) -> None:
if self.amount_items < self.clothe.amount:
raise ValidationError("Amount Item more than Amount Clothes ")
return super(ClotheItem, self).clean()
def __str__(self) -> str:
return f"{self.clothe}"
创建迁移和银行:
python manage.py makemigrations
python manage.py migrate
任务
我们的目标是创建一条路由,该路由生成有关我们商店销售的报告,该报告可以由Excel读取,我们将在Applied Application 任务中创建一个文件。PY < < /strong>包含一个有趣的功能
from openpyxl import Workbook
from core.models import Sale
def create_report_of_sale():
wb = Workbook()
ws = wb.active
ws.append(["ID", "Criado Em", "Total","Vendedor","Roupas", "Quantidade de Roupas"])
sales = Sale.objects.prefetch_related("items")
for sale in sales:
date = sale.created_at.strftime('%Y-%m-%d')
items = []
for item in sale.items.all():
items.append(item.clothe.name)
ws.append([sale.pk,date,sale.total, sale.saller.name, str(items), len(items)])
wb.save("store_sales.xlsx")
以上仅创建一个包含一些销售数据的表的store_sales.xlsx文件。
URL
from django.contrib import admin
from django.urls import path
from core.views import CreateReportToSale
urlpatterns = [
path('admin/', admin.site.urls),
path('report/sales/', CreateReportToSale.as_view()),
]
视图
让我们创建以管理路线的视图,添加以下内容:
from rest_framework.views import APIView
from core.tasks import create_report_of_sale
from rest_framework.response import Response
import time
# Create your views here.
class CreateReportToSale(APIView):
def get(self, request):
tb = time.time()
create_report_of_sale()
ta = time.time()
print(f"Tempo Processando:{ta-tb}s")
return Response(data={"msg":f"Relatório criado com sucesso !"},status=200)
上面的创建创建了一种视图,可以使用所有HTTP获取并创建我们的报告来管理路线。我们添加了一个打印,以知道花时间。
实现它
对于我们生成报告,我们需要数据,让我们使用库 model_bakery。
python manage.py shell
内部执行以下命令:
>>> from model_bakery import baker
>>> baker.make("core.ClotheItem", _quantity=3000)
以上将创建3000件衣服,3000件衣服和3000个销售,因为我们的型号 clotheitem 与这些型号有关,我们有 null = false。 代码用于模拟3000销售的创建,每件衣服。
然后输入报告的创建路线,并运行项目,让我们看看有一个延迟要处理,我们还可以看到我们去终端时花费时间的时间。 p>
使用任务队列NA API
依赖
这是一个时间需要花费时间来处理的示例,我们还有其他示例,例如访问API的访问权限,发送电子邮件等。因此,要解决我们的问题,我们将使用 任务队列 。首先让我们安装芹菜库。
pip install celery
后来,我们必须使用以下Docker命令来攀登我们的消息经纪(RabbitMQ):
docker run -d -p 5672:5672 rabbitmq
芹菜
有了这两个依赖,我们可以继续进行项目。首先创建一个称为 celery.py 的文件
from celery import Celery
import django
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'store_app.settings')
django.setup()
app = Celery("store_app", broker='pyamqp://guest@localhost//')
app.autodiscover_tasks()
以上是使用settings.py文件的配置AMQP协议。
以下行是为了使所有放置在settings.py上的应用程序都可以使用此实例。
我们应该做的另一种配置是在项目的初始。pyâ,以便在Django初始化项目时初始化芹菜的实例,因此我们的应用程序可以从函数中访问 shared_task 。
from celery import app
__all__ = ('app',)
任务
在 tasks.py file 中,我们可以修改以下方式:
from openpyxl import Workbook
from core.models import Sale
from celery import shared_task #<-new
@shared_task(
bind=True,
max_retries=5,
default_retry_delay=30) #<-new
def create_report_of_sale(self): #<-new
try: #<-new
wb = Workbook()
ws = wb.active
ws.append(["ID", "Criado Em", "Total","Vendedor","Roupas", "Quantidade de Roupas"])
sales = Sale.objects.prefetch_related("items")
for sale in sales:
date = sale.created_at.strftime('%Y-%m-%d')
items = []
for item in sale.items.all():
items.append(item.clothe.name)
ws.append([sale.pk,date,sale.total, sale.saller.name, str(items), len(items)])
wb.save("sample.xlsx")
except: #<-new
raise self.retry() #<-new
在上面包含的我们正在使用Decorator shared_task, 使用此装饰器我们正在创建一个可以通过任何代码调用的新任务,我们仍在给他一些论点,例如:
- bind - 使我们能够访问特定任务的某些属性,例如请求对象或重试函数,这使我们能够如果某事走了,可以再次执行任务错误;
- max_retries - 添加了许多动议的会计师试图回到重试;
- default_retry_dely_dely - 一个呼叫重试和另一个呼叫之间的时间。 ###查看
在我们看来,我们可以修改以下方式:
from rest_framework.views import APIView
from core.tasks import create_report_of_sale
from rest_framework.response import Response
import time
# Create your views here.
class CreateReportToSale(APIView):
def get(self, request):
tb = time.time()
create_report_of_sale.delay() #<-new
ta = time.time()
print(f"Tempo de Processamento:{ta-tb}")
return Response(data={"msg":f"Relatório criado com sucesso !"},status=200)
有了对母亲的简单呼吁,可能会发生,但为此,创建我们的任务消费者很重要,输入另一个终端并添加以下内容:
celery -A store_app worker --loglevel=INFO
注意:请记住,我们的过程工作没有热行,即每次修改任务运行以上时。
输入您的项目并访问我们报告的创建路线。正如我们可以看到的那样,事情发生了更加勇敢,因为当我们访问路线时,我们创建了一个添加到经纪人并被可展示的工人消费的任务,而将任务添加到经纪人的时间比处理要多得多。它。
repositã³rio:
Código Desenvolvido
反思
https://docs.celeryq.dev/en/stable/index.html
https://denibertovic.com/posts/celery-best-practices/