带有自定义调度程序的增压芹菜节拍-RDBBEAT
#python #celery #sqlalchemy #celerybeat

您是否曾经想为您服务中的大量用户添加时间表,并发现celery-beat有限吗?然后,您偶然发现了django-celery-beat,但是您正在使用fastapiflask?这篇文章只是为您制作!

tldr:使用rdbbeat库在您的rdb中持续动态时间表。

  • pip install rdbbeat
  • 使用自定义调度程序运行celery-beat
    • python -m celery --app=server.tasks beat --loglevel=info --scheduler=rdbbeat.schedulers:DatabaseScheduler.
  • 运行芹菜工人并使用RDBBEAT模型和控制器添加/获取时间表。

介绍

celery-beat从运行时间之前定义的时间表运行定期任务。我们想要一个可以在操作过程中动态添加到系统的工具。

rdbbeat以多种方式扩展了celery-beat调度系统,以通用模式解决上述问题。首先,该库是使用sqlalchemy型号构建的,并保留了您的RDB中的时间表。该库在数据库模型上实现侦听器,以捕获运行时间期间时间表的加法/删除和/或修改。

用法示例

我们可以查看此示例:

构建公司的管理工具,该工具通过电子邮件向员工发送“生日快乐”消息。

我们可以使用rdbbeat在登机期间动态地为每个员工的生日添加时间表。时间表持续存在在数据库中,可以随时修改或删除。

可以在Github中找到此示例的完整代码。

1.使用flaskcelery的基本服务设置12

mkdir rdbbeat-flask-example
cd rdbbeat-flask-example
python -m venv venv
source venv/bin/activate
pip install flask celery # view requirements.txt for other dependencies

2.基本型号和数据库设置

  • 创建并运行数据库服务器(我使用Postgres)
  • 在Sqlalchemy中创建员工模型
# server/models.py
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()


class Employee(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80), nullable=False)
    surname = db.Column(db.String(80), nullable=False)
    date_of_birth = db.Column(db.Date, nullable=False)

    def to_dict(self):
        return {
            "id": self.id,
            "name": self.name,
            "surname": self.surname,
            "date_of_birth": self.date_of_birth.strftime("%Y-%m-%d"),
        }

3.基本烧瓶应用程序设置

  • 一个带有蓝图的简单烧瓶应用程序用于员工路线
# server/app.py
import os

from dotenv import load_dotenv
from flask import Flask
from flask_cors import CORS
from flask_migrate import Migrate

from server.db_connection import DATABASE_URL
from server.models import db

load_dotenv()


app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = DATABASE_URL
app.config["SECRET_KEY"] = os.getenv("SECRET_KEY")
# Celery configuration
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0"
app.config["CELERY_RESULT_BACKEND"] = "database"
app.config["CELERY_RESULT_DBURI"] = DATABASE_URL
app.config["CELERY_TRACK_STARTED"] = True
app.config["CELERY_SEND_EVENTS"] = True
app.config["BROKER_TRANSPORT_OPTIONS"] = {"visibility_timeout": 3600}
app.config["CELERY_DEFAULT_QUEUE"] = "default"

migrate = Migrate(app, db, directory="server/migrations", compare_type=True)


CORS(app)
db.init_app(app)

from server.views import employee_router  # noqa isort:skip

app.register_blueprint(employee_router)


@app.route("/")
def index():
    return "Learn to use the celery-rdbbeat scheduler!"

nb:仅添加芹菜配置,我们将稍后使用

  • db_connection.py文件保存使用会话管理的DB连接

  • views.py文件保存员工路线

from dateutil.parser import parse
from flask import Blueprint, jsonify, request
from rdbbeat.controller import Schedule, ScheduledTask, schedule_task

from server.db_connection import session_scope
from server.models import Employee

employee_router = Blueprint("employee_router", __name__, url_prefix="/api/v1")


@employee_router.post("/employees/")
def create_employee():
    employee_data = request.get_json()
    date_of_birth = parse(employee_data["date_of_birth"]).date()

    employee = Employee(
        name=employee_data["name"],
        surname=employee_data["surname"],
        date_of_birth=date_of_birth,
    )
    with session_scope() as session:
        session.add(employee)
        session.commit()

    return jsonify(db_employee.to_dict()), 201

@employee_router.get("/employees/<int:employee_id>")
def get_employee(employee_id):
    with session_scope() as session:
        employee = session.query(Employee).get(employee_id)
        if not employee:
            return jsonify({"error": "Employee not found"}), 404

        return jsonify(employee.to_dict())

测试点

  • 使用烧瓶迁移或纯净的Alembic创建迁移并运行它们
# with flask-migrate
flask db init
flask db migrate -m "create employee table" # create migrations
flask db upgrade # create the table in the DB
  • 检查您的表是否是在DB中创建的(我使用TablePlus
  • 运行烧瓶应用程序
export FLASK_APP=server/app.py
flask run # (or python -m flask run)
  • 在这一点上,您应该能够使用上面创建的路线来创建员工并从数据库中获取员工。您可以在此处使用Postman或失眠。我只是使用卷发。
# Create an employee
curl -X POST -H "Content-Type: application/json" -d '{"name": "John", "surname": "Doe", "date_of_birth": "1990-01-01"}' http://localhost:5000/api/v1/employees/
# {"date_of_birth":"1990-01-01","id":1,"name":"John","surname":"Doe"}
# Get an employee with id=1
curl http://localhost:5000/api/v1/employees/1
# {"date_of_birth":"1990-01-01","id":1,"name":"John","surname":"Doe"}

现在,让调度乐趣开始!

4.用rdbbeat作为自定义调度程序运行芹菜束

  • 运行迁移以创建rdbbeat表(请注意,它们生活在scheduler模式中,而不是public模式)
python -m alembic -n scheduler upgrade head
  • 然后运行芹菜束(在单独的终端中)
python -m celery --app=server.tasks beat --loglevel=info --scheduler=rdbbeat.schedulers:DatabaseScheduler

5.创建一个简单的芹菜应用并将其链接到烧瓶应用程序

# server/celery_worker.py
from celery import Celery

from server.app import app as flask_app
from server.db_connection import session_scope

REDIS_URL = "redis://localhost:6379/0"


def create_celery(flask_app=flask_app):
    celery_app = Celery("flask", broker=REDIS_URL)

    celery_app.conf.task_default_queue = "default"

    celery_app.conf.broker_transport_options = {
        "max_retries": 3,
        "interval_start": 0,
        "interval_step": 0.2,
        "interval_max": 0.2,
    }

    # Provide session scope to `rdbbeat`
    celery_app.conf.session_scope = session_scope
    celery_app.conf.update(flask_app.config)

    TaskBase = celery_app.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with flask_app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery_app.Task = ContextTask

    return celery_app


app = create_celery(flask_app=flask_app)

nb:请确保将会话范围提供给rdbbeat,以便它可以访问db。

  • 创建一个简单的任务
# server/tasks.py
from server.celery_worker import app
from server.db_connection import session_scope
from server.models import Employee, db


@app.task(name="birthday_greeting")
def birthday_greeting(employee_id):
    with session_scope() as session:
        employee = session.query(Employee).get(employee_id)
        print(f"Happy birthday, {employee.name} {employee.surname}!")

        # Send email to employee
        # email_service.send_email(template="birthday_greeting", to=employee.email, context={"employee": employee.to_dict()})
        # Remind his manager too, in case they forgot :)
  • 现在,修改views.py文件以添加每个员工生日的时间表 信息
# server/views.py
...
from rdbbeat.controller import Schedule, ScheduledTask, schedule_task

@employee_router.post("/employees/")
def create_employee():
    employee_data = request.get_json()
    date_of_birth = parse(employee_data["date_of_birth"]).date()

    employee = Employee(
        name=employee_data["name"],
        surname=employee_data["surname"],
        date_of_birth=date_of_birth,
    )
    with session_scope() as session:
        session.add(employee)
        session.commit()

        # Create birthday greeting task
        db_employee = session.query(Employee).get(employee.id)
        schedule = Schedule(
            minute="*",
            hour="*",
            day_of_week="*",
            day_of_month=str(date_of_birth.day),
            month_of_year=str(date_of_birth.month),
            timezone="UTC" # FIX-ME: get timezone from employee
        )

        task_to_schedule = ScheduledTask(
            name=f"{db_employee.id}_birthday_greeting",  # All tasks must have a unique name
            task="birthday_greeting",
            schedule=schedule,
        )
        # Provide task kwargs for when the task is executed
        task_kwargs = {"employee_id": db_employee.id}
        schedule_task(session=session, scheduled_task=task_to_schedule, **task_kwargs)

    return jsonify(db_employee.to_dict()), 201
...

6.运行所有内容并测试!

  • 为了测试理智,让我们修改时间表以每分钟运行
# server/views.py
...
schedule = Schedule(
    minute="*",
    hour="*",
    day_of_week="*",
    day_of_month="*",
    month_of_year="*",
    timezone="UTC"
)
...
  • 检查在1端子中运行的烧瓶应用程序
  • 是否运行
  • 检查芹菜束在另一个终端中是否运行
  • 然后在另一个终端运行芹菜工人
python -m celery --app=server.tasks worker --loglevel=info
  • 再次运行这些卷曲命令以创建员工。

7.它有效!!

  • 您应该在2终端中看到类似的日志:
[2021-03-01 16:00:00,000: INFO/MainProcess] Scheduler: Sending due task birthday_greeting (birthday_greeting)
[2021-03-01 16:00:00,000: INFO/MainProcess] Scheduler: Sending due task birthday_greeting (birthday_greeting)

  • 和芹菜工人的原木应该很高兴:
[2021-03-01 16:00:00,000: INFO/MainProcess] Received task: birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b]  
[2021-03-01 16:00:00,000: INFO/MainProcess] Received task: birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b]  
[2021-03-01 16:00:00,000: INFO/MainProcess] Task birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] succeeded in 0.000s: None
[2021-03-01 16:00:00,000: INFO/MainProcess] Task birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] succeeded in 0.000s: None

8.高级东西

8.1。单池

您可以使用single-beat在一个过程中运行芹菜束。它具有其好处 - 基本上可以安全的防护人员抵抗同时运行的多个芹菜过程。

8.2。用K8S

部署

如果您正在Kubernetes运行系统,则可以在Pod上启动芹菜beat,在其他豆荚中的芹菜工作人员,并在其他pod中运行服务器应用程序。只要他们都可以访问同一DB,他们就应该能够相互交流。

9.替代方案。

9.1。 redbeat

根据其Github的文档,RedBeat是芹菜节目调度程序,可在Redis中存储计划的任务和运行时元数据。这里的权衡是您的任务时间表是Redis的商店,您必须确保您的Redis实例高度可用以避免丢失时间表。

9.2。 Django芹菜节拍

rdbbeat做了Django Celery Beat为Django做的事情,但对于Sqlalchemy。如果您正在为服务器应用程序使用Django框架,则可以使用Django Celery Beat安排任务。