AWS步骤功能回调模式
#aws #教程 #serverless #database

系统函数中的某些操作异步。很多时候,相同的操作还必须碰巧负责协调外部工作流程,以提供主工作流的执行的总体状态。 AWS的自然拟合是使用步骤功能并使用回调模式。在本文中,我将介绍回调模式的示例,同时使用AWS的HealthLake及其出口功能作为异步工作的骨干。欢迎来到AWS步骤函数回调模式。

建筑学

让我们首先从总体体系结构图开始。该解决方案的总体前提是AWS的Healthlake允许“自上次自上次”出口所有资源。通过使用步骤功能,Lambdas,SQS,DynamoDB,S3,分布式映射和EventBridge,我将构建最终的无服务器回调工作流。我觉得自己在kinesis和sn之外,我在这一个中都触摸了它们。

AWS Step Functions Callback Pattern Architecture

这里有很多事情发生,所以我将其分解为段:

  1. 触发状态机
  2. 记录保存和运行状态
  3. 运行导出并启动回调
  4. 轮询出口并重新启动状态机
  5. 工作结果
  6. 总结
  7. 处理失败

紧紧挂起,会有很多代码和许多细节。如果您想跳到代码,它在底部的here

工作流程

在深入研究工作流程的步骤之前,本文中的所有代码都将使用CDK(Typescript),Golang,并由AWS的HealthLake支持,这可能是我最喜欢的DynamoDB外部我最喜欢的无服务器数据库。我还在生产中运行大量卷,所以我所展示的内容很容易被加固以在您的环境中进行生产。

触发状态机器

HealthLake对可以在任何给定时间运行的“出口”数量有一个紧张的配额。该数字是 1 ,它使触发器的时机和“锁定”状态机的管理非常重要。对于触发器,我使用的是每5分钟运行一次的EventBridge时间表。该时间表看起来像是指示频率的cron表达式。

const rule = new Rule(scope, "ExportRule", {
    description: "Runs the CDC Export Process",
    schedule: Schedule.expression("cron(0/" + 5 + " * * * ? *)"),
});

构建规则,我将添加一个目标,一个死字母队列,以便我可以捕获状态机器的失败启动,并使该规则的角色掌握。

const dlq = new Queue(this, "RuleDeadLetterQueue", {
    queueName: "healthlake-cdc-trigger-dlq",
});

const role = new Role(this, "Role", {
    assumedBy: new ServicePrincipal("events.amazonaws.com"),
});

rule.addTarget(
    new SfnStateMachine(props.stateMachine, {
        deadLetterQueue: dlq,
        role: role,
    })
);

随着所有部署的一切,控制台中的规则将看起来像以下图像。

Event Bridge Schedule

记录保存和运行状态

还记得我提到的文章开头,一次有一个很难的配额在运行的出口数量吗?对于这款状态机,我有一个DynamoDB表,该表在记录中保存了一些信息。

我保留的第一张唱片是最后一步知道两件事。一个,目前有工作吗?第二,最后一次执行作业是什么时候以便我可以将这段时间包括在导出的过滤器参数中?唱片看起来像这样。

{
    "runStatus": "COMPLETED",
    "lastRunTime": "2023-08-24T15:45:34.265Z",
    "id": "RUN"
}

其次,我持有当前运行的时间触发,以便当状态机成功完成时,我可以在此时间更新上述记录,以便我的数学差距没有任何空白。

{
    "id": "CURRENT_RUN",
    "triggerTime": "2023-08-24T15:45:34.265Z"
}

工作流程的这一部分的好处是,我使用的是带有步骤函数的本机SDK调用,因此我不需要任何额外的计算,只为DynamoDB支付读/写单元。

工作流分支

请注意,“最后一个运行状态”是从“查找最后运行”步骤中发现的数据工作的选择。如果作业当前正在运行,则状态机将跳过此运行并获得成功。但是,如果作业当前未运行,则将其设置为运行时间,并将“运行”记录设置为“运行”,以便可以开始导出。

运行导出并启动回调

到目前为止,没有什么应该有所不同。仅使用正在运行某些本机SDK步骤的状态计算机的步骤功能。但这就是要转弯的地方。首先,如果您打算使用AWS步骤功能的回调模式,则需要确保状态机是标准机器而不是明确的工作流程。 Express不支持回调,因为Express的15分钟超时窗口可能还不够。有了标准的工作流程,您最多可以在State Machine Times Out之前一年,因此那里有很多空间。另外,以这种方式考虑。当标准是通过过渡时,表示持续时间的快速流量会充电。当您有可能等待一段时间的东西时,这更有意义。

内部机器内部

对于此工作流程,我将在SQS队列上发布一条消息,以供lambda拾起并阅读并做某事。此外,我将传递任务令牌,这是我的其他工作流程将成功,失败或心跳回到状态机时需要使用的回调票。心跳?那是什么?这是回调模式的一个不错的功能,如果您的其他工作流程在设置的任何时间内都没有入住,则状态机将放弃并将该步骤标记为故障,并在其余工作流程中掉落。方便,对吗?就我的示例而言,您可以看到心跳设置为120秒。

ASL JSON(亚马逊州语言)中的定义具有这种形状。

{
    "Next": "Map",
    "Type": "Task",
    "HeartbeatSeconds": 120,
    "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
    "Parameters": {
        "QueueUrl": "${StartExportQueueUrl}",
        "MessageBody": {
            "taskToken.$": "$$.Task.Token",
            "lastRunTime.$": "$.context.lastRunTime",
            "runStatus.$": "$.context.runStatus"
        }
    },
    "Catch": [
        {
            "ErrorEquals": ["States.ALL"],
            "Next": "Set Failed"
        }
    ]
}

启动出口

现在,要使用AWS HealthLake启动实际导出,我有一个lambda功能,该功能正在从状态机张贴的队列中读取。对我来说,这是AWS步骤函数回调模式的位置,因为我还有另一个工作流程,可以访问一种机制来更新其进度的主要工作流程。另一个工作流程可能会做您需要的任何事情,但是在本文中,这是HealthLake及其出口过程,设计是一种异步操作。

该函数的内部词除了从队列中读取,拉动最后一个运行时间并构造执行导出所需的有效负载。该有效载荷将其传递给下面的邮政请求。在查看有效载荷之前,请注意URL上的_since参数。这是从状态机和DynamoDB表中的队列进入的,该表告诉导出要回到有多远的资源。

https://healthlake.us-west-2.amazonaws.com/datastore/{{HL_STORE_ID}}/r4/$export?_since=2023-07-18T00:00:00.461Z

{
    "JobName": "2023-07-19 15:06:49 +0000 UTC",
    "OutputDataConfig": {
        "S3Configuration": {
            "S3Uri": "s3://<the s3 uri>",
            "KmsKeyId": "arn:aws:kms:us-west-2:<account id>:key/<key id>"
        }
    },
    "DataAccessRoleArn": "arn:aws:iam::<account id>:role/<key id>"
}

代码执行此帖子后,它将在另一个lambda监视的另一个队列上输入一条消息。

messageOutput, err := sqsClient.SendReCheckMessage(ctx, &models.ExportStatusWithTaskToken{
    ExportStatus: models.ExportStatus{
        DatastoreId: exportOutput.DatastoreId,
        JobStatus:   exportOutput.JobStatus,
        JobId:       exportOutput.JobId,
    }, TaskToken: body.TaskToken,
})

该消息中的详细信息将为工作状态功能提供足够的详细信息来完成其工作,这简单地检查状态,如果完成,请抓住输出并告诉状态机以重新工作。或者,如果工作失败,则同样的事情,嘿,状态机,重新工作!

轮询出口并重新启动状态机器

好吧,导出正在运行。需要多长时间取决于其需要导出多少数据。如果您将其作为示例项目运行,则可以在不到30秒的时间内完成,但是如果您自上次运行以来的时期内进行了数千个变化,则可能需要60-120秒。这是相当有效的。

您可能不经常使用SQS的东西是延迟的交付功能。在缓存中,它的作用像是ttl,因为该消息被隐藏在可见度之前。

message := sqs.SendMessageInput{
    QueueUrl:     &s.reCheckQueueUrl,
    DelaySeconds: 30,
    MessageBody:  &sb,
}

当该消息可用时,Lambda功能将读取有效负载,然后使用JobID和数据存储来向HealthLake提出请求,以询问该作业的状态。 Healthlake将返回出口状态。如果作业的启动或运行,我们的功能将在队列中再将另一个消息放回队列中,但是如果失败或完成,它将首先构建一个清单文件(我们将尽快讨论),然后将状态计算机通知状态计算机完成工作。

那么这个清单文件是什么?我不想对此进行领导,但是该示例代码还展示了如何使用去年在Re Invent上介绍的分布式地图步骤。该地图状态将使用包含将要迭代并用来传播HealthLake更改的导出文件键的文件。清单是根据描述API调用的输出构建的。我承诺在一开始会发生很多事情,并有很多细节。目前,我们在杂草深处就这种工作流程融合在一起。

最后,此侧工作流程最终将状态发送回状态机。这是Golang代码中的3种响应类型

心跳

input := &sfn.SendTaskHeartbeatInput{
    TaskToken: &exd.TaskToken,
}

_, err := sfnClient.SendTaskHeartbeat(ctx, input)

成功

strOutput := fmt.Sprintf("{\"bucket\": \"%s\", \"manifest\": \"%s\"}", bucketName, *file)
input := &sfn.SendTaskSuccessInput{
    TaskToken: &exd.TaskToken,
    Output:    &strOutput,
}

失败

input := &sfn.SendTaskFailureInput{
    TaskToken: &exd.TaskToken,
}

_, _ = sfnClient.SendTaskFailure(ctx, input)

另外一个代码段,我们不能忘记给这些操作的函数访问。

f.describeExportFunction.addToRolePolicy(
    new PolicyStatement({
        actions: [
            "states:SendTaskFailure",
            "states:SendTaskHeartbeat",
            "states:SendTaskSuccess",
        ],
        effect: Effect.ALLOW,
        resources: [cdc.sf.stateMachineArn],
    })
);

工作结果

现在我们已经到达了工作流的重新启动。让我们首先假设失败。在这种情况下,我正在向DynamoDB进行本机SDK呼叫,以更新运行记录,以表明该作业失败了。我不会更新上一次运行时间,以便我可以在5分钟内再次挑选此期间,然后重试。

在成功的情况下,lambda将S3 URI发送到JSON数组的清单文件。当构建AWS步骤使用回调模式功能时,您可能正在处理大量结果。直列地图一次可以处理最大40。对于导出,可能会生成100或1000个文件。导出正在使用NEWLINE划定JSON的NDJSON。因此,我可能在每个导出文件中都有多个记录。

使用此分布式映射将一个文件发送给每个迭代,然后由lambda挑选,该lambda将NDJSON分解为单独的记录,以进行标准的在线映射,该记录可以进行更改。

总体亚图

Sub-Map

子图的执行输出

Sub-Map Execution Stats
Sub-Map Execution

发布结果

我跳过了子图流的开头的准备更改功能,因为它不在本文的范围之内。但是,当您选择AWS HealthLake时,您将注册FHIR符合ABIAOQIAN的数据存储。 FHIR代表快速的医疗保健可互操作资源,这是在域边界之间在域边界之间交换患者和其他医疗保健数据的首选格式。准备功能会破坏NDJSON,并使下游发送到生态系统的几乎没有fhir有效载荷。

EventBridge PuteVents SDK调用将这些单独的FHIR对象放在Eventbridge自定义总线上。然后,该自定义总线打开了可以是内部和外部的目的地世界。

Custom Bus

包起来

让我们从杂草中抬起一点,假设一切都按预期进行。工作流程中要做的事情是:

  1. 将作业状态更新为“已完成”
  2. 将最后一次运行时间更新为工作流程开始时登录的时间。

这两件事将为成功而设置下一个。

处理失败

在另一方面,如果有任何步骤遇到故障,我们只是做一件事。

  1. 将最后一个运行的作业状态更新为失败。

这将使工作流程的传入运行表明上一个工作流程没有干净完成,因此只需从上次开始并运行即可。下一个运行将在我强调上面的许多段落的_since查询参数中使用该非突变时间,以便HealthLake可以抓住变化的内容。

代码

这已经深入研究了几个概念。该文章的起源是基于AWS步骤功能回调模式构建的,但是为此,我们需要HealthLake的出口功能以及最近发布的STEP功能中最近发布的分布式MAP功能。我试图没有过度编码文章,但是有一个完全可用和可部署的accompanying repository。但是,请小心,Healthlake可以有点昂贵运行,所以请注意堆叠多长时间。

包起来

我希望您仍然与我同在,因为这是一个相当深刻地研究了几个不同的概念和无服务器组件。当使用AWS步骤功能和回调模式构建时,您在处理工作流程的方式方面具有很大的灵活性,但通过TaskToken的成功,失败和心跳的非常简单的方法提供了动力。

这是我第一次接触回调模式和AWS步骤功能,而我的感知是,这将是复杂且难以完成的。这是对表面上似乎很困难的事情的一种毫无根据和情感的反应,但我对我能够轻松而能够将其融合在一起感到惊喜。此外,我也没有使用分布式地图状态,而且再次,像分布式地图一样复杂的东西大部分都从我身上抽象出来,这样我就可以专注于构建所需的逻辑,而不是基础架构或未分化的繁重提升必需的。那就是无服务器的美丽和AWS的美丽。

直到下次开心建筑!