使用Netflix导体中的代码动态工作流
#python #java #microservices #clojure

Netflix Conductor是一个流行的平台,可以通过创建跨越服务的工作流来构建有弹性的状态应用程序。您可以在Playground上尝试本文的工作流程,这是指挥的免费托管版本。

什么是工作流程?

在研究如何使用导体创建工作流程之前,让我们首先定义工作流程是什么。简而言之,工作流程是一系列以特定顺序执行的任务或步骤,以实现目标。工作流程用于自动化复杂过程,并确保以逻辑序列完成所有必要的步骤。导体工作流由任务和操作员组成。

workflow = {任务 +运算符}

任务是封装在导体服务器外运行的业务逻辑并作为微服务,lambda或工人实现的服务。工人在导体服务器之外运行,可以用任何支持的语言实现。工作流可以包含以不同语言编写的多个工人。

运算符是用于控制工作流程中执行流程的编程语言的原始内容。导体支持运算符,例如开关,循环,叉/联接和子工作流程,从而使您可以定义复杂的工作流程。

将工作流程与可重复使用的服务分开

导体促进了应用工作流程和用作工作流程的构件的服务之间的明确分离。这种分离可确保任务遵循单一责任原则,并且通常是无国籍的。

此模型确保了两件事:

  1. 在许多工作流程中的服务可重复使用,促进了整个团队的更大协作。
  2. 无状态的任务工作者可以根据卷来快速扩展/向上扩展,同时仅在导体服务器上维护状态。

工作流定义 - json或代码?

为什么不同时?

导体服务器将工作流定义存储为服务器端的JSON。但是,这并不限制用户单独将其工作流表示为JSON。导体支持使用代码创建工作流程并执行使用代码表达的预注册和动态工作流程。

简单示例

让我们举一个简单的两任务工作流程:

Two-Task workflow in Conductor

JSON

{
  "name": "simple_two_task_workflow",
  "version": 1,
  "schemaVersion": 2,
  "tasks": [
    {
      "name": "task1",
      "taskReferenceName": "task1",
      "inputParameters": {},
      "type": "SIMPLE"
    },
    {
      "name": "task2",
      "taskReferenceName": "task2",
      "inputParameters": {},
      "type": "SIMPLE"
    }
  ]
}

不同语言的相同工作流程:

爪哇

ConductorWorkflow workflow = new ConductorWorkflow(workflowExecutor);
workflow.setName("simple_two_task_workflow");
workflow.setVersion(1);

//Add tasks
workflow.add(new SimpleTask("task1", "task1"));
workflow.add(new SimpleTask("task2", "task2"));

//Execute
workflow.executeDynamic(//input);

conductorWorkflow := workflow.NewConductorWorkflow(executor.NewWorkflowExecutor(client.NewAPIClient(nil, nil)))
conductorWorkflow.Name("simple_two_task_workflow").Version(1)

//Add Tasks
conductorWorkflow.
  Add(workflow.NewSimpleTask("task1", "task1")).
  Add(workflow.NewSimpleTask("task2", "task2"))

//Execute
conductorWorkflow.StartWorkflow(//input)

Python

workflow = ConductorWorkflow(
  executor=workflow_executor,
  name=simple_two_task_workflow,
  version=1,
)

#Two Tasks
task1 = SimpleTask('task1', 'task1')
task2 = SimpleTask('task2', 'task2')

#Add tasks to the workflow using >> operator
workflow = workflow >> task1 >> task2

# Execute the workflow
workflow.start_workflow(#input)

使用代码创建工作流可以打开可能无法使用静态定义定义工作流的用例 - 这可能是当任务数量及其流量取决于取决于其他因素的数据时。

复杂示例

让我们以基于用户数据动态创建的假设工作流程的示例:

ConductorWorkflow workflow = new ConductorWorkflow(workflowExecutor);
workflow.setName("complex_dynamic_workflow");
workflow.setVersion(1);

//Get the list of users to send notification to
List<UserInfo> users = getUsers();
Task<?>[] tasks = new Task[users.size()];

int counter = 0;
for (UserInfo user : users) {
  if(user.sendEmail) {
    SimpleTask task = new SimpleTask("send_email", "send_email_" + counter);
    task.input("email", user.email);
    tasks[counter++] = task;
  } else {
    SimpleTask task = new SimpleTask("send_sms", "send_sms_" + counter);
    task.input("phone", user.phone);
  tasks[counter++] = task;
  }
}

//Run all the tasks in parallel
workflow.add(new ForkJoin("run_in_parallel", tasks));

//Execute workflow and get the future to wait for completion.
CompletableFuture executionFuture = workflow.execute(new HashMap<>());

//Alternatively, kick off the workflow if it's going to be a long-running workflow
String workflowId = workflowClient.startWorkflow(new StartWorkflowRequest().withWorkflowDef(workflow.toWorkflowDef()));

在上面的示例中,我们从后端商店加载用户列表,对于每个用户,创建一个任务以发送SMS或电子邮件。通过为每个用户添加适当的通知任务创建工作流程,然后执行。

取决于工作流程完成需要多长时间,导体提供了一种使用“期货”或启动工作流程完成工作流程的方法执行。 (或可以在UI中搜索和查看)

您可以在Java,Golang,Python,Csharp,JavaScript,甚至Clojure中使用代码编写工作流程。

访问Conductor SDK on GitHub获取具有完全工作示例应用的最新SDK。

动态工作流程和导体

使用代码动态创建工作流的能力使开发人员可以在无法预定工作流程的情况下解决非常复杂的用例。借助导体,您仍然可以使用导体可视化的全部功能来实现此操作,从而使您可以可视化UI中的整个执行。就像吃蛋糕也一样!

概括

Netflix导体是一个功能强大的平台,可让您创建最复杂的工作流程,同时使用强大的调试和可视化工具,可以轻松地处理运行时方案,从而减少了在生产环境中检测和解决问题的平均时间。

请务必在GitHub上查看Netflix Conductor。我们的Orkes团队确实提供了Conductor Playground —  a free version of Conductor

Join our Slack Community