您是否曾经想为您服务中的大量用户添加时间表,并发现celery-beat
有限吗?然后,您偶然发现了django-celery-beat
,但是您正在使用fastapi
或flask
?这篇文章只是为您制作!
tldr:使用rdbbeat库在您的rdb中持续动态时间表。
pip install rdbbeat
- 使用自定义调度程序运行
celery-beat
:-
python -m celery --app=server.tasks beat --loglevel=info --scheduler=rdbbeat.schedulers:DatabaseScheduler
.
-
- 运行芹菜工人并使用RDBBEAT模型和控制器添加/获取时间表。
介绍
celery-beat从运行时间之前定义的时间表运行定期任务。我们想要一个可以在操作过程中动态添加到系统的工具。
rdbbeat
以多种方式扩展了celery-beat
调度系统,以通用模式解决上述问题。首先,该库是使用sqlalchemy
型号构建的,并保留了您的RDB中的时间表。该库在数据库模型上实现侦听器,以捕获运行时间期间时间表的加法/删除和/或修改。
用法示例
我们可以查看此示例:
构建公司的管理工具,该工具通过电子邮件向员工发送“生日快乐”消息。
我们可以使用rdbbeat
在登机期间动态地为每个员工的生日添加时间表。时间表持续存在在数据库中,可以随时修改或删除。
可以在Github中找到此示例的完整代码。
1.使用flask
和celery
的基本服务设置12
mkdir rdbbeat-flask-example
cd rdbbeat-flask-example
python -m venv venv
source venv/bin/activate
pip install flask celery # view requirements.txt for other dependencies
2.基本型号和数据库设置
- 创建并运行数据库服务器(我使用Postgres)
- 在Sqlalchemy中创建员工模型
# server/models.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Employee(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), nullable=False)
surname = db.Column(db.String(80), nullable=False)
date_of_birth = db.Column(db.Date, nullable=False)
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"surname": self.surname,
"date_of_birth": self.date_of_birth.strftime("%Y-%m-%d"),
}
3.基本烧瓶应用程序设置
- 一个带有蓝图的简单烧瓶应用程序用于员工路线
# server/app.py
import os
from dotenv import load_dotenv
from flask import Flask
from flask_cors import CORS
from flask_migrate import Migrate
from server.db_connection import DATABASE_URL
from server.models import db
load_dotenv()
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = DATABASE_URL
app.config["SECRET_KEY"] = os.getenv("SECRET_KEY")
# Celery configuration
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0"
app.config["CELERY_RESULT_BACKEND"] = "database"
app.config["CELERY_RESULT_DBURI"] = DATABASE_URL
app.config["CELERY_TRACK_STARTED"] = True
app.config["CELERY_SEND_EVENTS"] = True
app.config["BROKER_TRANSPORT_OPTIONS"] = {"visibility_timeout": 3600}
app.config["CELERY_DEFAULT_QUEUE"] = "default"
migrate = Migrate(app, db, directory="server/migrations", compare_type=True)
CORS(app)
db.init_app(app)
from server.views import employee_router # noqa isort:skip
app.register_blueprint(employee_router)
@app.route("/")
def index():
return "Learn to use the celery-rdbbeat scheduler!"
nb:仅添加芹菜配置,我们将稍后使用
-
db_connection.py
文件保存使用会话管理的DB连接 -
views.py
文件保存员工路线
from dateutil.parser import parse
from flask import Blueprint, jsonify, request
from rdbbeat.controller import Schedule, ScheduledTask, schedule_task
from server.db_connection import session_scope
from server.models import Employee
employee_router = Blueprint("employee_router", __name__, url_prefix="/api/v1")
@employee_router.post("/employees/")
def create_employee():
employee_data = request.get_json()
date_of_birth = parse(employee_data["date_of_birth"]).date()
employee = Employee(
name=employee_data["name"],
surname=employee_data["surname"],
date_of_birth=date_of_birth,
)
with session_scope() as session:
session.add(employee)
session.commit()
return jsonify(db_employee.to_dict()), 201
@employee_router.get("/employees/<int:employee_id>")
def get_employee(employee_id):
with session_scope() as session:
employee = session.query(Employee).get(employee_id)
if not employee:
return jsonify({"error": "Employee not found"}), 404
return jsonify(employee.to_dict())
测试点
- 使用烧瓶迁移或纯净的Alembic创建迁移并运行它们
# with flask-migrate
flask db init
flask db migrate -m "create employee table" # create migrations
flask db upgrade # create the table in the DB
- 检查您的表是否是在DB中创建的(我使用
TablePlus
) - 运行烧瓶应用程序
export FLASK_APP=server/app.py
flask run # (or python -m flask run)
- 在这一点上,您应该能够使用上面创建的路线来创建员工并从数据库中获取员工。您可以在此处使用Postman或失眠。我只是使用卷发。
# Create an employee
curl -X POST -H "Content-Type: application/json" -d '{"name": "John", "surname": "Doe", "date_of_birth": "1990-01-01"}' http://localhost:5000/api/v1/employees/
# {"date_of_birth":"1990-01-01","id":1,"name":"John","surname":"Doe"}
# Get an employee with id=1
curl http://localhost:5000/api/v1/employees/1
# {"date_of_birth":"1990-01-01","id":1,"name":"John","surname":"Doe"}
现在,让调度乐趣开始!
4.用rdbbeat
作为自定义调度程序运行芹菜束
- 运行迁移以创建
rdbbeat
表(请注意,它们生活在scheduler
模式中,而不是public
模式)
python -m alembic -n scheduler upgrade head
- 然后运行芹菜束(在单独的终端中)
python -m celery --app=server.tasks beat --loglevel=info --scheduler=rdbbeat.schedulers:DatabaseScheduler
5.创建一个简单的芹菜应用并将其链接到烧瓶应用程序
# server/celery_worker.py
from celery import Celery
from server.app import app as flask_app
from server.db_connection import session_scope
REDIS_URL = "redis://localhost:6379/0"
def create_celery(flask_app=flask_app):
celery_app = Celery("flask", broker=REDIS_URL)
celery_app.conf.task_default_queue = "default"
celery_app.conf.broker_transport_options = {
"max_retries": 3,
"interval_start": 0,
"interval_step": 0.2,
"interval_max": 0.2,
}
# Provide session scope to `rdbbeat`
celery_app.conf.session_scope = session_scope
celery_app.conf.update(flask_app.config)
TaskBase = celery_app.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with flask_app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery_app.Task = ContextTask
return celery_app
app = create_celery(flask_app=flask_app)
nb:请确保将会话范围提供给
rdbbeat
,以便它可以访问db。
- 创建一个简单的任务
# server/tasks.py
from server.celery_worker import app
from server.db_connection import session_scope
from server.models import Employee, db
@app.task(name="birthday_greeting")
def birthday_greeting(employee_id):
with session_scope() as session:
employee = session.query(Employee).get(employee_id)
print(f"Happy birthday, {employee.name} {employee.surname}!")
# Send email to employee
# email_service.send_email(template="birthday_greeting", to=employee.email, context={"employee": employee.to_dict()})
# Remind his manager too, in case they forgot :)
- 现在,修改
views.py
文件以添加每个员工生日的时间表 信息
# server/views.py
...
from rdbbeat.controller import Schedule, ScheduledTask, schedule_task
@employee_router.post("/employees/")
def create_employee():
employee_data = request.get_json()
date_of_birth = parse(employee_data["date_of_birth"]).date()
employee = Employee(
name=employee_data["name"],
surname=employee_data["surname"],
date_of_birth=date_of_birth,
)
with session_scope() as session:
session.add(employee)
session.commit()
# Create birthday greeting task
db_employee = session.query(Employee).get(employee.id)
schedule = Schedule(
minute="*",
hour="*",
day_of_week="*",
day_of_month=str(date_of_birth.day),
month_of_year=str(date_of_birth.month),
timezone="UTC" # FIX-ME: get timezone from employee
)
task_to_schedule = ScheduledTask(
name=f"{db_employee.id}_birthday_greeting", # All tasks must have a unique name
task="birthday_greeting",
schedule=schedule,
)
# Provide task kwargs for when the task is executed
task_kwargs = {"employee_id": db_employee.id}
schedule_task(session=session, scheduled_task=task_to_schedule, **task_kwargs)
return jsonify(db_employee.to_dict()), 201
...
6.运行所有内容并测试!
- 为了测试理智,让我们修改时间表以每分钟运行
# server/views.py
...
schedule = Schedule(
minute="*",
hour="*",
day_of_week="*",
day_of_month="*",
month_of_year="*",
timezone="UTC"
)
...
- 检查在1端子中运行的烧瓶应用程序 是否运行
- 检查芹菜束在另一个终端中是否运行
- 然后在另一个终端运行芹菜工人
python -m celery --app=server.tasks worker --loglevel=info
- 再次运行这些卷曲命令以创建员工。
7.它有效!!
- 您应该在2终端中看到类似的日志:
[2021-03-01 16:00:00,000: INFO/MainProcess] Scheduler: Sending due task birthday_greeting (birthday_greeting)
[2021-03-01 16:00:00,000: INFO/MainProcess] Scheduler: Sending due task birthday_greeting (birthday_greeting)
- 和芹菜工人的原木应该很高兴:
[2021-03-01 16:00:00,000: INFO/MainProcess] Received task: birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b]
[2021-03-01 16:00:00,000: INFO/MainProcess] Received task: birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b]
[2021-03-01 16:00:00,000: INFO/MainProcess] Task birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] succeeded in 0.000s: None
[2021-03-01 16:00:00,000: INFO/MainProcess] Task birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] succeeded in 0.000s: None
8.高级东西
8.1。单池
您可以使用single-beat在一个过程中运行芹菜束。它具有其好处 - 基本上可以安全的防护人员抵抗同时运行的多个芹菜过程。
8.2。用K8S
部署如果您正在Kubernetes运行系统,则可以在Pod上启动芹菜beat,在其他豆荚中的芹菜工作人员,并在其他pod中运行服务器应用程序。只要他们都可以访问同一DB,他们就应该能够相互交流。
9.替代方案。
9.1。 redbeat
根据其Github的文档,RedBeat是芹菜节目调度程序,可在Redis中存储计划的任务和运行时元数据。这里的权衡是您的任务时间表是Redis的商店,您必须确保您的Redis实例高度可用以避免丢失时间表。
9.2。 Django芹菜节拍
rdbbeat
做了Django Celery Beat为Django做的事情,但对于Sqlalchemy。如果您正在为服务器应用程序使用Django框架,则可以使用Django Celery Beat
安排任务。