与AWS Healthlake一起活动驱动的建筑
#serverless #database #go #eventdriven

AWS Healthlake目前缺少的功能之一是一个合适的事件框架。使用DynamoDB,您有溪流。使用RDS,您可以使用DMS。但是使用Healthlake没有天然变化的数据捕获机制。

,由于这些天我仅在事件驱动的体系结构上工作,所以我需要一种能够处理变更的方法。我将在下面向您展示的不是批准,但它是100%AWS本地,并且继续以无服务器为主题。话虽如此,如果您只想向前跳,这里就是Github Repository CDK代码将部署 a healthlake实例 @ $ .27/hr,所以请运行 cdk destroy npx ts-node bin/app.ts`完成

需求

在EDA(事件驱动的体系结构)中,我希望能够除了无服务器之外,还可以触发下游的工作流程。在Healthlake中,我没有对此提供本地支持(但)我冒险弄清楚我是否可以检查某个地方的步道/日志并进行某些事情。以下是本文讨论

的架构

Healthlake CDC architecture架构相当简单。这是如何工作的崩溃

  1. 对HealthLake进行的每个API呼叫均以AWS CloudTrail登录
  2. 设置一个事件桥规则,该规则会倾听这些事件以及我将我转发到可以处理该事件的状态计算机上的任何事件
  3. 然后过滤事件。如您所见,您不会获得突变的事物的ID,只是它被突变的时间。因此,我首先需要找到已更改的所有内容
  4. 然后进行一些辩护。我的客户总是很掌握,但我不想强迫不必要的噪音进入我可以预防的生态系统。
  5. 将其更改为事件桥梁上的自定义事件总线,然后让客户设置自己的规则以与事件合作

通过CloudTrail和审核工作

因此,要捕获CloudTrail的事件,我需要一个EventBridge规则才能实现这一目标。看起来像这个

typescript
const rule = new events.Rule(this, 'rule', {
eventPattern: {
source: ["aws.healthlake"],
detailType: [
"AWS API Call via CloudTrail"
],
detail: {
eventSource: [
"healthlake.amazonaws.com"
],
eventName: [
"CreateResource",
"UpdateResource"
],
requestParameters: {
datastoreId: [hl.attrDatastoreId]
},
responseElements: {
statusCode: [200, 201]
}
}
},
ruleName: "capture-healthlake-events",
});

这是为了在CloudTrail上聆听所有发送给HealthLake的Createresource和Updateresource活动。通过进一步将规则限制在只有201和200的statusCode的规则中,这将是投票事件和事件。

接下来添加该规则为lambda处理程序的目标

typescript
const queue = new sqs.Queue(this, 'Queue', {
queueName:
rule-vent-dlq`
});

rule.addtarget(new lambdafunction(props.func,{
) Deadletterqueue:队列,//可选:添加一个死字母队列
maxeventage:cdk.duration.hours(2),//可选:设置MaxEventage重试策略
retryattempt:2,//可选:设置最大重试尝试
}));
`
事件完成后,启动了执行以下

的状态机器
  • 仅找到更改的Ent
  • 水合或填充数据
  • 发布到EventBridge

状态机工作流程

如果您真的深入研究了用CloudTrail登录的事件,则摩擦将归结为此。具有资源ID的元素将被隐藏起来。您会看到此文本

"resourceId": "HIDDEN_DUE_TO_SECURITY_REASONS"

这一切都很好,但是在试图确定发生变化的原因时,事情变得非常困难。以下是工作流程的形状

CDC Step functions state machinelet更深入地研究患者工作流程。

首先,在工作流程的开头有一个停顿,因为我注意到读取中有一个小滞后,因为我知道它们是一致的。

接下来,患者水力局正在对患者资源进行搜索,以寻找所有更改,因为事件时间戳

golang
url := fmt.Sprintf("https://%s/%s/r4/Patient?_lastUpdated=ge%s", h.HealthLakeEndpoint, h.HealthLakeDataStore, lastUpdated.Format(time.RFC3339))

免责声明,在使用DynamoDB方面,我可以在接下来的两个步骤中使用本机SDK集成,但我选择了lambdas。我在这些方面所做的就是准备出版记录,然后检查是否已经处理过。我有一个简单的DynamoDB表,可以每天触摸事物的日志。

最后,如果记录是第一个计时器,则我将其发送到我之前创建的EventBridge Bus,以便其他人可以订阅

包起来

如果您查看要跟随的存储库,则此解决方案会有很多事情。

  • HealthLake是变更的发起人
  • CloudTrail拥有Healthlake正在发生的事件和操作
  • 您需要构建一个EventBridge规则(必须在默认的总线中)才能收听这些更改
  • 建造处理程序或管道来处理更改
  • 状态机器工作流程可能很复杂
    • 暂停
    • 水力发生 /找到变化的东西< / li>
    • 必须实现多种资源类型
    • Deduping
    • 发布到自定义活动桥梁总线

但老实说,由于天然的FHIR和以患者为中心的数据存储,HealthLake必须成为生态系统的关键部分,因此需要将这些变化传播到更广泛的生态系统中。因此,在AWS构建此功能之前,该解决方案非常适合我需要做的事情。

希望您可以接受并应用它和/或适应您的需求!