NODEJS工作流引擎用于JSON数据转换
#node #json #workflow #datatransformation

概述

在变压器服务中,我们正在进行从客户事件到目标事件的数据转换,我们希望将转换过程组织到逻辑分开的步骤中,以更好地理解和可维护性,因此我们创建了一个工作流引擎以满足以下要求。

  • 验证
    • 事件验证
    • 目标配置验证
  • 目的地数据映射的来源
  • 用目标API调用丰富数据
  • 处理不同类型的事件
    • 跟踪,标识,页面等
    • 自定义类别:
      • 观看的产品
      • 购买的产品
  • 多路复用
  • 批处理
  • 响应建筑。

当前,大多数步骤是使用JavaScript代码实现的,该代码提供了最大的灵活性。尽管如此,以标准化的方式维护,理解,调试,测试和开发仍然很难。为了实现标准化,我们正在构建一个工作流引擎,该引擎旨在提供提高的可读性,可检验性,可重复性和开发速度。

由于我们想使用易于读写和写模板的语言来表达逻辑的转换。我们支持以下模板语言:

templateType: Jsonata
steps:
  - name: unsupported
    condition: $not(op in ["+", "-", "*", "/"])
    template: |
      $doThrow("unsupported operation")
  - name: add
    description: Do addition
    condition: op = "+"
    template: |
      ( a + b )
  - name: subtract
    description: Do subtraction
    condition: op = "-"
    template: |
      ( a - b )
  - name: multiply
    description: Do multiplication
    condition: op = "*"
    template: |
      ( a * b )
  - name: divide
    description: Do division
    condition: op = "/"
    template: |
      ( 
        $assert( b != 0, "division by zero is not allowed");
        a / b 
      )

入门

  • npm install rudder-workflow-engine
const workflowEngine = WorkflowEngineFactory.createFromFilePath("workflow.yaml", options);
workflowEngine.execute(input);

特征

配置驱动

用户应该能够将目标转换逻辑作为YAML文件中的一系列步骤表示作为工作流程。步骤可以写为模板基本语言。

绑定

支持使用绑定进口外部功能和数据。

工作流绑定

  • 绑定类似于导入,该导入允许将外部定义的功能和数据导入工作流程。
  • 类型

    • 类型1:从文件导入特定字段。
      name: EventType 
      path: ./config
    
    • Eventype /config 文件中定义
      • 类型2:从文件中导入所有内容。
      name: MappingData 
      path: ./mapping
      exportAll: true
    
    • 的所有内容 文件将导入变量 mappingdata
    • 如果 something1 something 2 是在 ./映射中定义的,那么我们需要使用 $ mappingdata.something1 strong>和 $ mappingdata.something2
      • 类型3:导入文件中定义的所有内容
      path: ./utils
    
    • /utils 文件的所有内容都将以相同的名称导入。
    • 如果 something1 something 2 ./ utils 中定义和 $ someings2
  • 完整示例:

  bindings:
    - name: EventType 
      path: ./config
    - name: MappingData 
      path: ./mapping
      exportAll: true
    - path: ./utils 
  • 在定义工作流程时,这些是用户指定的绑定。 ####平台绑定
  • 该平台提供了这些绑定,可以直接使用它们,而无需在绑定中定义它们块。
  • 我们将很快发布有关这些绑定的详细文档。 ####执行绑定
  • $输出:提供了对当前步骤之前执行的先前步骤的输出的访问。
  steps:
    - name: step1
      template: |
        {
          "a": something
        }
    - name: step2
      template: |
      {
        "b": $doSomething($outputs.step1.a)
      }
  • step2 使用 step1的输出。
  • 工作流引擎会自动将步骤输出绑定到 $ uptufs 变量。
    • $ setContext:它是将任何数据存储在$上下文中并以后使用的功能。 $输出是只读变量,供用户参考上一个步骤输出,因此我们可以使用它们来通过可修改的结果。因此,如果我们想以多个步骤更新相同的变量,则应使用 $ setContext
  • 示例:

    steps:
      - name: setAForCase1
        condition: $isCase1(message)
        template: |
          $setContext("a", something1)
      - name: setAForCase2
        condition: $isCase2(message)
        template: |
          $setContext("a", something2)
      - name: updateA
        template: |
          $setContext("a", $updateA($context.a))
      - name: useA
        template: |
          $doSomething($context.a)
    
    • 在此示例中,我们以多个步骤重复更新变量,因此不可能使用 $ uptufs。
    • 此功能的一个实际情况是:我们希望根据某些条件以不同的方式填充一个对象,然后使用它。
  • $上下文:使用 $ setContext 函数访问变量。请参阅上面的示例。

    脚步

  • 步骤是工作流的主要执行块。

  • 步骤必须包含一个名称才能跟踪输出。

  • 步骤可以包含一个可选的描述字段来描述细节。

  • 该步骤只能在满足条件时才包含可选的条件字段才能执行。

  • 该步骤可以包含一个可选的 InputTemplate 字段来自定义输入,该输入将在执行步骤时通过。

  • 支持两种不同类型的步骤:

    • Simplestep
    • WorkFlowStep

状况

  • 工作流程中的一步可以提及可选条件,以便仅在满足条件时才能执行。
  • 条件也是Jsonata代码。

    steps:
      - name: commonValidation
        template: |
          ( common validations for events )
      - name: ValidateInputOfTrackEvent
        condition: message.type = EventType.Track
        template: |
          ( some validations specific to track events)
    

InputTemplate **

  • 默认情况下,所有步骤都会收到与工作流输入相同的输入,但是当我们要在执行步骤之前修改输入时,我们可以使用此功能。
  steps:
    - name: step1
      (some logic ...)
    - name: step2
      inputTemplate: |
        (customize the input)
    - name: step3
      (some logic ...)
  • 在上面的示例中:STEP1和Step3将使用工作流的输入执行,但是STEP2接收到 inputTemplate中定义的自定义输入 ### ContextTemplate
    • 默认情况下,所有步骤都会接收当前上下文,但是在执行步骤之前要修改上下文时,我们可以使用此功能。当使用外部工作流程,工作流步骤或模板路径时,这很有用。
  steps:
    - name: step1
      (some logic to prepareContext)
    - name: step2
      contextTemplate: |
        (customize the context for step2)
      (some logic ...)
    - name: step3
      (some logic ...)
  • 在上面的示例中:步骤3将使用步骤1中准备的上下文执行,但是步骤2接收到 context -template中定义的自定义上下文。 ### loopoverInput
    • 当输入为数组时,我们可以使用此功能,我们希望独立执行每个元素的步骤逻辑。
    • 这主要用于批处理处理,如果在处理特定步骤时发生错误,我们报告失败并成功执行而不会失败。
  name: executeForEach
  loopOverInput: true
  template: |
    ( do something )
  • 如果该步骤的输入为[E1,E2,E3],则该步骤将独立执行所有元素,并想象它失败了E1,并且对于E2和E3的成功,然后整体步骤输出将为以下内容:

    [
      {
        "error": someErrorForE1
      },
      {
        "output": someOutputForE2
      },
      {
        "output": someOutputForE3
      }
    ]
    

oncomplete

  • 当步骤完成后,下一步将执行,但是如果我们要以特定步骤的输出退出工作流,则可以使用此步骤。
  • 此功能仅在有条件的步骤中使用。
  • 示例1:避免重新处理,因此返回而无需修改输入消息。
    steps:
      - name: checkIfProcessed
        condition: message.processed = true
        template: |
          message
        **onComplete: return**
      - name: processMessage
        template: |
          (...)
  • 在上面的示例中,我们不想重新处理消息,因此,如果它们已经处理过,我们需要立即退还它们。
    • 示例2:处理输入消息后提早返回。
  steps:
    - name: step1
      template: |
        (doSomeProcessing)
    - name: **step2**
      condition: someCondition
      template: |
        (doSomeProcessing)
      onComplete: return
    - name: step3
      template: |
        (doSomeProcessing)
  • 在此示例中,我们希望成功地处理 step2 的消息后,由于此步骤是有条件的,如果不满足条件,则 Step3 将被执行。 ### Onerror
    • 默认情况下,如果任何步骤失败,则整个工作流程失败,但是如果步骤使用 onerror:继续设置,则工作流将忽略错误并继续执行。
  steps:
    - name: step1
      template: |
        (doSomeProcessing)
    - name: **step2**
      template: |
        (doSomeProcessing)
      onError: continue
    - name: step3
      template: |
        (doSomeProcessing)
  • 在上面的示例中,如果在step1或step3中发生任何错误,则工作流将立即退出,但是当步骤2失败时,工作流程会忽略错误并继续执行step3。

脚步

简单的步骤

  • 简单的步骤是工作流程中执行的基本单位。
  • 一个简单的步骤可以是函数,它在 bindings中定义
  bindings:
    - name: **processTrackEvent
      path: ./transform # actual file name is transform.js**
  steps:
    - name: processTrackEvent
      functionName: processTrackEvent
  • 我们可以在定义绑定时省略 .js 扩展。
  • ProcessTrackevent 必须具有以下定义。
  (input: any, bindings: Record<string, any>) => { 
    error?: any, 
    output?: any 
  }
  • 一个简单的步骤可以是JSONATA模板。
  name: processTrackEvent
  template: |
    (JSONata template to process track events)**
  • 模板也可以从文件路径导入。

    name: processTrackEvent
    templatePath: ./trackTemplate.yaml
    
    • 我们可以在一个简单的步骤中使用外部工作流程
  steps:
    - name: prepareContext
      template: $setContext("batchMode", true)
    - name: transform
      **externalWorkflow:
        path: ./pinterest_tag_single_workflow.yaml**
      loopOverInput: true
  • 我们正在重复批处理事件转换工作流中的单个事件工作流程。
  • 外部工作流程将作为黑匣子执行,因此我们只能访问工作流的最终输出,而不能访问步骤的各个输出。
  • 步骤输入 context 执行外部工作流程。
  • 父级工作流的上下文传递给子女工作流程( externalworkflow ),而不是反之亦然。这有助于根据使用的位置自定义子工作流执行。
  • 外部工作流程无法访问父工作流程输出。

    工作流步骤

    • 系列简单步骤。
    steps:
      - name: category
        template: |
          (compute category)
      - name: ecom
        condition: $outputs.category = "ecom"
        steps:
          - name: validateInput
            description: Common validation for all ECom pages
            template: |
              (assert everything is fine)
          - name: page
            template: |
              (compute page using $outputs.category)
          - name: processSearchPage
            condition: $outputs.ecom.page = "search"
            template: |
              (search page template)
          - name: processDetailPage
            condition: $outputs.ecom.page = "detail"
            template: |
              (detail page template)
          - name: processCartPage
            condition: $outputs.ecom.page = "cart"
            template: |
              (cart page template)
    
    • 我们可以访问输出通常类似于 $ outputs.category。
    • 要访问WorkFlowStep的子步骤的输出,我们需要使用 $ utputss.workflowstepname.childstepname ,例如:$ utputs.ecom.page。
      • 儿童步骤的输出在工作流步骤之外不可用。
      • 最后一个成功执行的儿童步骤的输出将成为工作流程的输出,我们只能在工作流步骤之外访问 $ utput workflowstepname,例如,$ output.ecom。
    • 目前,我们不支持嵌套工作流程。
    • 可以从文件导入工作流程步骤。
    steps:
      - name: processECommerace
        workflowStepPath: ./ecomWorkflow.yaml
    
    • 支持其他绑定
    bindings:
      - name: commonBinding
        path: ./bindings
    steps:
      - name: processECommerace
        bindings:
          - name: stepBinding
            path: ./workflow_step_bindings
        steps:
          - name: validateInput
            description: Common validation for all ECom pages
            template: |
              (assert with $commonBinding)
          - name: page
            template: |
              (compute page using $workflowBinding)
          - name: processSearchPage
            condition: $outputs.ecom.page = "search"
            template: |
              (search page template)
    
    • 在上面的示例中: ProcessEcommerace 步骤是工作流程步骤,并导入其他绑定。工作流程( commonBinding )和步骤s( stepbinding )绑定都可以使用工作流程。

测试

使用开玩笑的测试场景

  • npm run jest:scenarios -- --scenarios=<comma separate scenarios>
  • 示例:npm run jest:scenarios -- --scenarios=basic_workflow,to_array

手动测试场景

  • npm run test:scenario -- -s <scenario_folder> -i <test_case_index_from_data.json>
  • 示例:npm run test:scenario -- -s outputs -i 1
  • 注意:它只是运行测试案例并产生结果,但不会对结果进行任何验证。

结论

我们已经构建了这个workflow engine来解决我们的问题,我们相信该框架也足以用于其他用例,因此请随时拍摄它。