概述
在变压器服务中,我们正在进行从客户事件到目标事件的数据转换,我们希望将转换过程组织到逻辑分开的步骤中,以更好地理解和可维护性,因此我们创建了一个工作流引擎以满足以下要求。 P>
- 验证
- 事件验证
- 目标配置验证
- 目的地数据映射的来源
- 用目标API调用丰富数据
- 处理不同类型的事件
- 跟踪,标识,页面等
- 自定义类别:
- 观看的产品
- 购买的产品
- 多路复用
- 批处理
- 响应建筑。
当前,大多数步骤是使用JavaScript代码实现的,该代码提供了最大的灵活性。尽管如此,以标准化的方式维护,理解,调试,测试和开发仍然很难。为了实现标准化,我们正在构建一个工作流引擎,该引擎旨在提供提高的可读性,可检验性,可重复性和开发速度。
由于我们想使用易于读写和写模板的语言来表达逻辑的转换。我们支持以下模板语言:
- JSONata
- JsonTemplate.
- 易于扩展到more template languages。 使用jsonata的工作流示例:
templateType: Jsonata
steps:
- name: unsupported
condition: $not(op in ["+", "-", "*", "/"])
template: |
$doThrow("unsupported operation")
- name: add
description: Do addition
condition: op = "+"
template: |
( a + b )
- name: subtract
description: Do subtraction
condition: op = "-"
template: |
( a - b )
- name: multiply
description: Do multiplication
condition: op = "*"
template: |
( a * b )
- name: divide
description: Do division
condition: op = "/"
template: |
(
$assert( b != 0, "division by zero is not allowed");
a / b
)
入门
-
npm install rudder-workflow-engine
const workflowEngine = WorkflowEngineFactory.createFromFilePath("workflow.yaml", options);
workflowEngine.execute(input);
特征
配置驱动
用户应该能够将目标转换逻辑作为YAML文件中的一系列步骤表示作为工作流程。步骤可以写为模板基本语言。
绑定
支持使用绑定进口外部功能和数据。
工作流绑定
- 绑定类似于导入,该导入允许将外部定义的功能和数据导入工作流程。
-
类型
- 类型1:从文件导入特定字段。
name: EventType path: ./config
-
Eventype 在/config 文件中定义
- 类型2:从文件中导入所有内容。
name: MappingData path: ./mapping exportAll: true
- 的所有内容 文件将导入变量 mappingdata
- 如果 something1 和 something 2 是在 ./映射中定义的,那么我们需要使用 $ mappingdata.something1 strong>和 $ mappingdata.something2
- 类型3:导入文件中定义的所有内容
path: ./utils
- /utils 文件的所有内容都将以相同的名称导入。
- 如果 something1 和 something 2 在 ./ utils 中定义和 $ someings2
-
完整示例:
bindings:
- name: EventType
path: ./config
- name: MappingData
path: ./mapping
exportAll: true
- path: ./utils
- 在定义工作流程时,这些是用户指定的绑定。 ####平台绑定
- 该平台提供了这些绑定,可以直接使用它们,而无需在绑定中定义它们块。
- 我们将很快发布有关这些绑定的详细文档。 ####执行绑定
- $输出:提供了对当前步骤之前执行的先前步骤的输出的访问。
steps:
- name: step1
template: |
{
"a": something
}
- name: step2
template: |
{
"b": $doSomething($outputs.step1.a)
}
- step2 使用 step1的输出。
- 工作流引擎会自动将步骤输出绑定到 $ uptufs 变量。
- $ setContext:它是将任何数据存储在$上下文中并以后使用的功能。 $输出是只读变量,供用户参考上一个步骤输出,因此我们可以使用它们来通过可修改的结果。因此,如果我们想以多个步骤更新相同的变量,则应使用 $ setContext 。
-
示例:
steps: - name: setAForCase1 condition: $isCase1(message) template: | $setContext("a", something1) - name: setAForCase2 condition: $isCase2(message) template: | $setContext("a", something2) - name: updateA template: | $setContext("a", $updateA($context.a)) - name: useA template: | $doSomething($context.a)
- 在此示例中,我们以多个步骤重复更新变量,因此不可能使用 $ uptufs。
- 此功能的一个实际情况是:我们希望根据某些条件以不同的方式填充一个对象,然后使用它。
-
$上下文:使用 $ setContext 函数访问变量。请参阅上面的示例。
脚步
-
步骤是工作流的主要执行块。
-
步骤必须包含一个名称才能跟踪输出。
-
步骤可以包含一个可选的描述字段来描述细节。
-
该步骤只能在满足条件时才包含可选的条件字段才能执行。
-
该步骤可以包含一个可选的 InputTemplate 字段来自定义输入,该输入将在执行步骤时通过。
-
支持两种不同类型的步骤:
- Simplestep
- WorkFlowStep
状况
- 工作流程中的一步可以提及可选条件,以便仅在满足条件时才能执行。
-
条件也是Jsonata代码。
steps: - name: commonValidation template: | ( common validations for events ) - name: ValidateInputOfTrackEvent condition: message.type = EventType.Track template: | ( some validations specific to track events)
InputTemplate **
- 默认情况下,所有步骤都会收到与工作流输入相同的输入,但是当我们要在执行步骤之前修改输入时,我们可以使用此功能。
steps:
- name: step1
(some logic ...)
- name: step2
inputTemplate: |
(customize the input)
- name: step3
(some logic ...)
- 在上面的示例中:STEP1和Step3将使用工作流的输入执行,但是STEP2接收到 inputTemplate中定义的自定义输入
### ContextTemplate
- 默认情况下,所有步骤都会接收当前上下文,但是在执行步骤之前要修改上下文时,我们可以使用此功能。当使用外部工作流程,工作流步骤或模板路径时,这很有用。
steps:
- name: step1
(some logic to prepareContext)
- name: step2
contextTemplate: |
(customize the context for step2)
(some logic ...)
- name: step3
(some logic ...)
- 在上面的示例中:步骤3将使用步骤1中准备的上下文执行,但是步骤2接收到 context -template中定义的自定义上下文。
### loopoverInput
- 当输入为数组时,我们可以使用此功能,我们希望独立执行每个元素的步骤逻辑。
- 这主要用于批处理处理,如果在处理特定步骤时发生错误,我们报告失败并成功执行而不会失败。
name: executeForEach
loopOverInput: true
template: |
( do something )
-
如果该步骤的输入为[E1,E2,E3],则该步骤将独立执行所有元素,并想象它失败了E1,并且对于E2和E3的成功,然后整体步骤输出将为以下内容:
[ { "error": someErrorForE1 }, { "output": someOutputForE2 }, { "output": someOutputForE3 } ]
oncomplete
- 当步骤完成后,下一步将执行,但是如果我们要以特定步骤的输出退出工作流,则可以使用此步骤。
- 此功能仅在有条件的步骤中使用。
- 示例1:避免重新处理,因此返回而无需修改输入消息。
steps:
- name: checkIfProcessed
condition: message.processed = true
template: |
message
**onComplete: return**
- name: processMessage
template: |
(...)
- 在上面的示例中,我们不想重新处理消息,因此,如果它们已经处理过,我们需要立即退还它们。
- 示例2:处理输入消息后提早返回。
steps:
- name: step1
template: |
(doSomeProcessing)
- name: **step2**
condition: someCondition
template: |
(doSomeProcessing)
onComplete: return
- name: step3
template: |
(doSomeProcessing)
- 在此示例中,我们希望成功地处理 step2 的消息后,由于此步骤是有条件的,如果不满足条件,则 Step3 将被执行。
### Onerror
- 默认情况下,如果任何步骤失败,则整个工作流程失败,但是如果步骤使用 onerror:继续设置,则工作流将忽略错误并继续执行。
steps:
- name: step1
template: |
(doSomeProcessing)
- name: **step2**
template: |
(doSomeProcessing)
onError: continue
- name: step3
template: |
(doSomeProcessing)
- 在上面的示例中,如果在step1或step3中发生任何错误,则工作流将立即退出,但是当步骤2失败时,工作流程会忽略错误并继续执行step3。
脚步
简单的步骤
- 简单的步骤是工作流程中执行的基本单位。
- 一个简单的步骤可以是函数,它在 bindings中定义。
bindings:
- name: **processTrackEvent
path: ./transform # actual file name is transform.js**
steps:
- name: processTrackEvent
functionName: processTrackEvent
- 我们可以在定义绑定时省略 .js 扩展。
- ProcessTrackevent 必须具有以下定义。
(input: any, bindings: Record<string, any>) => {
error?: any,
output?: any
}
- 一个简单的步骤可以是JSONATA模板。
name: processTrackEvent
template: |
(JSONata template to process track events)**
-
模板也可以从文件路径导入。
name: processTrackEvent templatePath: ./trackTemplate.yaml
- 我们可以在一个简单的步骤中使用外部工作流程。
steps:
- name: prepareContext
template: $setContext("batchMode", true)
- name: transform
**externalWorkflow:
path: ./pinterest_tag_single_workflow.yaml**
loopOverInput: true
- 我们正在重复批处理事件转换工作流中的单个事件工作流程。
- 外部工作流程将作为黑匣子执行,因此我们只能访问工作流的最终输出,而不能访问步骤的各个输出。
- 用步骤输入和 context 执行外部工作流程。
- 父级工作流的上下文传递给子女工作流程( externalworkflow ),而不是反之亦然。这有助于根据使用的位置自定义子工作流执行。
-
外部工作流程无法访问父工作流程输出。
工作流步骤
- 系列简单步骤。
steps: - name: category template: | (compute category) - name: ecom condition: $outputs.category = "ecom" steps: - name: validateInput description: Common validation for all ECom pages template: | (assert everything is fine) - name: page template: | (compute page using $outputs.category) - name: processSearchPage condition: $outputs.ecom.page = "search" template: | (search page template) - name: processDetailPage condition: $outputs.ecom.page = "detail" template: | (detail page template) - name: processCartPage condition: $outputs.ecom.page = "cart" template: | (cart page template)
- 我们可以访问输出通常类似于 $ outputs.category。
- 要访问WorkFlowStep的子步骤的输出,我们需要使用 $ utputss.workflowstepname.childstepname ,例如:$ utputs.ecom.page。
- 儿童步骤的输出在工作流步骤之外不可用。
- 最后一个成功执行的儿童步骤的输出将成为工作流程的输出,我们只能在工作流步骤之外访问 $ utput 。 workflowstepname,强>例如,$ output.ecom。
- 目前,我们不支持嵌套工作流程。
- 可以从文件导入工作流程步骤。
steps: - name: processECommerace workflowStepPath: ./ecomWorkflow.yaml
- 支持其他绑定
bindings: - name: commonBinding path: ./bindings steps: - name: processECommerace bindings: - name: stepBinding path: ./workflow_step_bindings steps: - name: validateInput description: Common validation for all ECom pages template: | (assert with $commonBinding) - name: page template: | (compute page using $workflowBinding) - name: processSearchPage condition: $outputs.ecom.page = "search" template: | (search page template)
- 在上面的示例中: ProcessEcommerace 步骤是工作流程步骤,并导入其他绑定。工作流程( commonBinding )和步骤s( stepbinding )绑定都可以使用工作流程。
测试
使用开玩笑的测试场景
npm run jest:scenarios -- --scenarios=<comma separate scenarios>
- 示例:
npm run jest:scenarios -- --scenarios=basic_workflow,to_array
手动测试场景
npm run test:scenario -- -s <scenario_folder> -i <test_case_index_from_data.json>
- 示例:
npm run test:scenario -- -s outputs -i 1
- 注意:它只是运行测试案例并产生结果,但不会对结果进行任何验证。
结论
我们已经构建了这个workflow engine来解决我们的问题,我们相信该框架也足以用于其他用例,因此请随时拍摄它。