检查芹菜任务是否已经在运行
#python #django #celery

曾经尝试检查芹菜任务是否已经在运行?
这是我为检查任务是否按其名称运行的代码。

from celery import current_app


class CeleryHelper:
    """Contains helper functionalities to be used while interacting with Celery."""
    @staticmethod
    def is_being_executed(task_name: str) -> bool:
        """Returns whether the task with given task_name is already being executed.

        Args:
            task_name: Name of the task to check if it is running currently.
        Returns: A boolean indicating whether the task with the given task name is
            running currently.
        """
        active_tasks = current_app.control.inspect().active()
        for worker, running_tasks in active_tasks.items():
            for task in running_tasks:
                if task["name"] == task_name:
                    return True

        return False

我不久前搜索了这一点,我找不到一种直接的方法来在Django项目中进行此操作,但是上述代码经过测试并在基于Django的项目中起作用。