我一直想花更多的时间谈论AWS HealthLake。更具体地说,快速医疗保健互操作资源(FHIR)是医疗保健信息系统中互操作性的基础。我非常坚信,无服务器不仅适用于客户端和用户驱动的工作流程。我对此进行了广泛的文章here,但我想深入研究构建数据流的流。我在生产中使用了相当长的时间,所以让我们看一下Eventbridge Pipes丰富DynamoDB流。
设置问题
我以前曾经关于eventbridge,pipes和dynamoDB,但我想更仔细地了解如何丰富这些流的变化。将FHIR用作背景似乎是一种完美的选择。
与FHIR合作时,每种资源都有某些需要记录的细节。有人可能会争辩,为什么不只是在FHIR中存储用于交易系统的数据模型?我以前尝试过。很粗糙。随着变化的发生,您可能会对构架的交易级调整,以破坏您的域界限。 FHIR是一项可扩展的规范,适合许多医疗保健方案。这是一个规格的大毯子
有了该决定的考虑,我需要能够将特定于域的数据模型转换为FHIR。有时,我可能需要进行轻巧的翻译,以从JSON或关系到FHIR。有时,我可能需要再次使用其他信息来丰富变化的记录,FHIR可能不会与我的模型排列1:1。
使用DynamoDB我最喜欢的事情之一是流。这是一个强大的概念,它使触发系统的变化如此可以实现。在此示例中的这种变化将是虚构的,但可能是真正的患者表。这些更改将从DynamoDB流中流,然后通过EventBridge的管道处理,然后过滤,丰富然后写入CloudWatch。它们可以写给Kinesis或Healthlake或API目的地,但我想关注富集和过滤。
工作流程
与我的大多数文章一样,完全工作的代码将位于底部。我计划在撰写本文中撒一些,但您可以期待CDK(Typescript)和Golang。
DynamoDB患者
对于患者表来说,我有一个简单的设置。
this._table = new Table(this, id, {
billingMode: BillingMode.PAY_PER_REQUEST,
removalPolicy: RemovalPolicy.DESTROY,
partitionKey: { name: "id", type: AttributeType.STRING },
tableName: `Patients`,
encryption: TableEncryption.CUSTOMER_MANAGED,
encryptionKey: props.key,
stream: StreamViewType.NEW_AND_OLD_IMAGES,
});
这是一些CDK代码,该代码构建了Patients
表。该表将通过自定义KMS键进行加密,并将通过流传播新的和旧的图像更改。当表通过CDK堆栈部署表时,您会看到带有流的新表“”。
我已经设置了基础知识,当我想用Eventbridge Pipes丰富此DynamoDB时,我就可以去了。
要创建一个FHIR患者资源,我需要能够将我的域患者转换为该FHIR资源。该初始患者与字符串混合物和定义地址字段的地图一起存储在DynamoDB中。
我将使用的基本患者记录来改变以下外观:
当该记录发生任何事情时,它将流出我要做的事情。
Eventbridge管道
用Eventbridge Pipes处理DyanModB流是无服务器工作流程的缩影。 DynamoDB是无服务器,管道是无服务器的,我的过滤,转换和丰富均以无服务器为单位。管道看起来像这样。
构建管道的CDK代码很有趣。 CDK网站上有一些不错的文档,但是我确实必须参考几个不同的位置才能结合决赛。
const pipe = new CfnPipe(scope, "Pipe", {
name: "Patient-StreamChange-Pipe",
roleArn: pipeRole.roleArn,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
source: props.table.tableStreamArn!,
enrichment: props.enrichmentFunction.functionArn,
target: logGroup.logGroupArn,
sourceParameters: this.sourceParameters(),
targetParameters: this.targetParameters(logGroup),
enrichmentParameters: this.enrichmentParameters(),
});
那是一般管道的构建。在构建流的每个部分时,正在发生很多事情。我不会将整个文件倒在这里,但是我发现有趣的是我会突出显示。
sourceParameters = () => {
return {
dynamoDbStreamParameters: {
startingPosition: "LATEST",
batchSize: 1,
},
filterCriteria: {
filters: [
{
pattern: ' { "eventName": [ "MODIFY", "INSERT" ] }',
},
],
},
};
};
对于源,我将通过流参数连接EventBridge Pipes到DynamoDB流。此外,我只在该管道中处理这些更改和插入的更改。添加过滤器是惊人的,在从流中读取时,它将在记录不匹配时向前推进迭代器。
需要重点的另一部分是丰富的步骤。对于我在此示例中的丰富性,我没有塑造有效载荷。这样做的原因是,与Golang合作时,有一个struct
匹配流记录的确切格式。但是,如果您还有其他一些记录,那么使用inputTemplate
进行了转换。
enrichmentParameters = () => {
return {
lambdaParameters: {
invocationType: "REQUEST_RESPONSE",
},
inputTemplate: ``,
};
};
丰富流
我喜欢EventBridge管道如何使我能够在此过程中丰富有效载荷。您在丰富阶段有选项,但是我在这里无服务器,所以lambda非常适合。
设置操作,我想将DynamoDB患者表记录中的数据转换为符合FHIR的东西。截至目前,我首选的Lambda部署正在使用Golang二进制的Amazon Linux V2运行时。
我的Golang处理程序非常简单,我将在下面显示它。但是,正如我上面提到的,具有DynamoDB流记录,很容易将其纳入DynamoDBEventRecord
结构。请注意,这是一个即将进入的数组。我期望我在CDK代码中的处理中设置的批次大小的记录。
func handler(ctx context.Context, e []events.DynamoDBEventRecord) (*PatientEvent, error) {
log.WithFields(log.Fields{
"body": e,
}).Debug("Printing out the body")
if len(e) != 1 {
return nil, fmt.Errorf("wrong number of entries supplied")
}
fhirPatient, err := buildPatientEvent(&e[0])
if err != nil {
log.WithFields(log.Fields{
"err": err,
}).Error("Something bad happened when building the Patient Payload")
return nil, err
}
log.WithFields(log.Fields{
"fhirPatient": fhirPatient,
}).Debug("Printing out the payload")
return fhirPatient, nil
}
我正在返回病人,以在流中进行进一步处理。那个病人的event看起来像是FHIR和一些元数据的组合。
type PatientEvent struct {
Version string `json:"version"`
Source string `json:"source"`
EventType string `json:"eventType"`
CorrelationId string `json:"correlationId"`
Details struct {
Command string `json:"command"`
Body fhir.Patient `json:"entity"`
} `json:"details"`
}
我喜欢在管道处理中包含更多信息,以便可以将这些额外的数据下线使用。主FHIR患者存储在Body
字段中。
i然后将对话委派给该Body
`元素在流在流的函数中。
go
json:“命令”
func buildPatientEvent(r *events.DynamoDBEventRecord) (*PatientEvent, error) {
pe := &PatientEvent{
Version: "1.0",
Source: "PatientTable",
EventType: "PatientChange",
CorrelationId: r.EventID,
Details: struct {
Command string
json:“ entity”`
Body fhir.Patient
} {
命令:“ put”,
},
}
firpatient:= reir.patient {}
// repos中的更多代码
return pe, nil
}
`
我喜欢这样做的事情是,每个资源的转换代码都隔离到可以进行转换的单个富集函数。如果FHIR资源由于某种原因改变其定义,我只需要在一个地方进行这些调整即可。而且,如果我对此代码进行了良好的单位测试和覆盖范围,那么更好。我发现单一的责任原则在这里发挥了很大的作用。
目标
用EventBridge Pipes丰富DyanModB流时,我有很多关于目标的选择。我可以将富集的有效载荷放在队列上,启动状态机或使用API目的地,但是在这种情况下,我只是将其倒入CloudWatch。
typescript
targetParameters = (logGroup: LogGroup) => {
return {
cloudWatchLogsParameters: {
logStreamName: logGroup.logGroupName,
},
};
};
CloudWatch的目标仅需要一个我要将输出发送给的日志组。
如果我查看lambda输出,则详细信息可在CloudWatch中找到。
但是,如果我查看管道中的富集步骤的输出,则输出只是我从lambda函数中返回的输出。还记得PatientEvent
吗?这就是将出现在CloudWatch流中的内容。
轻松的peasy。现已将特定领域的患者转换为FHIR患者资源,以用于HealthLake或任何其他处理。
包起来
HL7 FHIR规范是定义医疗资源的超级强大方法,因此它们可以在其他医疗保健系统中互操作。当您入门时,可以进行很多处理和消化,但是使用像Healthlake这样的标准的东西,您应该在旅途中有一个不错的开端。
我前面提到将有示例代码。您可以在以下repository中找到本文中有效的所有作用。关注README进行设置并准备好玩。
我的最后几篇文章围绕着Healthlake,FHIR和Healthcare,因为我强烈认为该领域中仍然有太多不言而喻的问题。诸如选择正确的语言,正确的技术或正确的标准之类的事情似乎还不够。我希望通过只分享有效的方法并在我的旅途中为我工作而改变。所以我希望这很有帮助!
和往常一样,建筑快乐!