曾经尝试检查芹菜任务是否已经在运行?
这是我为检查任务是否按其名称运行的代码。
from celery import current_app
class CeleryHelper:
"""Contains helper functionalities to be used while interacting with Celery."""
@staticmethod
def is_being_executed(task_name: str) -> bool:
"""Returns whether the task with given task_name is already being executed.
Args:
task_name: Name of the task to check if it is running currently.
Returns: A boolean indicating whether the task with the given task name is
running currently.
"""
active_tasks = current_app.control.inspect().active()
for worker, running_tasks in active_tasks.items():
for task in running_tasks:
if task["name"] == task_name:
return True
return False
我不久前搜索了这一点,我找不到一种直接的方法来在Django项目中进行此操作,但是上述代码经过测试并在基于Django的项目中起作用。