DynamoDB流eventbridge管道多个项目
#aws #serverless #go #体系结构

最近,我已经在Eventbridge Pipes上写了几篇文章,特别是在DynamoDB流中使用它们。我写了关于Enrichment的文章。而且我写了关于直截了当的Streaming的文章。我相信,使用EventBridge管道在无服务器,事件驱动的方法中起着不错的作用。因此,在本文中,我想探索一个表中有多个项目的eventbridge pipes的流式DynamoDB。

我收到的有关Streaming DynamoDB to EventBridge Pipes的几条评论在附近,“如果我在同一张桌子中有多个项目收集怎么办?”我打算在本文中展示一种处理该确切问题的模式。在底部,您会找到一个可以在顶部部署和构建的工作代码样本。我在生产中使用了此精确的设置,因此请放心,这是一个很好的基础。

建筑学

让我们从定义我要浏览的设置开始。

  • DynamoDB表,带有2个项目类型
    • 患者
    • 地址
  • DynamoDB流连接到Eventbridge管道
  • EB管道将
    • 过滤器
    • 丰富
    • 放入EventBridge默认总线
  • EB规则为
    • 患者的Lambda处理程序
    • 地址的Lambda处理程序

EventBridge Pipe Stream

逐步浏览代码

DynamoDB表

我正在使用的DynamoDB表将包含多种项目类型。这可以描述为单桌设计,多项目收集设计或您喜欢的任何东西。关键是,DynamoDB非常擅长存储在同一表中相关的事物。我将id字段定义为分区键,然后将sk作为范围键。

this._table = new Table(this, id, {
    billingMode: BillingMode.PAY_PER_REQUEST,
    removalPolicy: RemovalPolicy.DESTROY,
    partitionKey: { name: "id", type: AttributeType.STRING },
    sortKey: { name: "sk", type: AttributeType.STRING },
    tableName: `Patients`,
    encryption: TableEncryption.CUSTOMER_MANAGED,
    encryptionKey: props.key,
    stream: StreamViewType.NEW_AND_OLD_IMAGES,
});

也请注意,我正在定义一个stream,该stream将通过附加在变更记录上的新旧图像传播更改。当我进入管道定义时,这将很有用。

患者看起来像这样:

{
    "id": "PATIENT#1",
    "sk": "ADDRESS#1",
    "address": "123 Some City, Some State USAA",
    "addressId": "1",
    "itemType": "Address",
    "patientId": "1"
}

和这样的地址:

{
    "id": "PATIENT#1",
    "sk": "ADDRESS#1",
    "address": "123 Some City, Some State USAA",
    "addressId": "1",
    "itemType": "Address",
    "patientId": "1"
}

Table Records

Eventbridge管道

将DynamoDB流到EventBridge管道时,管道是设计中的核心播放器。在这种情况下,我忽略了删除,仅处理DynamoDB修改和插入更改类型。

来源

我想首先解决您的源组件需要附加适当的IAM权限以从流中读取并解密数据的事实。

new PolicyDocument({
    statements: [
        new PolicyStatement({
            actions: [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams",
            ],
            effect: Effect.ALLOW,
            resources: [table.tableStreamArn!],
        }),
        new PolicyStatement({
            actions: [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:Encrypt",
                "kms:GenerateDataKey*",
                "kms:ReEncrypt*",
            ],
            resources: [key.keyArn],
            effect: Effect.ALLOW,
        }),
    ],
});

下一步是配置流读取器。除了仅在修改中进行过滤外,我还想一次处理1个记录。

return {
    dynamoDbStreamParameters: {
        startingPosition: "LATEST",
        batchSize: 1,
    },
    filterCriteria: {
        filters: [
            {
                pattern: ' { "eventName": [ "MODIFY", "INSERT" ] }',
            },
        ],
    },
};

丰富

在这种情况下,我想简单地将源事件的DynamoDB部分剥离,沿Golang的Raw struct下方,我可以将其传递到Eventbridge Bus。为此,我将使用lambda功能作为管道工作流程的一部分。

lambda将被触发作为请求/响应,使该工作流程中的同步。

return {
    lambdaParameters: {
        invocationType: "REQUEST_RESPONSE",
    },
    inputTemplate: ``,
};

lambda本身处理流记录的塑造。

func Convert(r *events.DynamoDBEventRecord) (*CustomEvent, error) {
    // the body of this function parses out the values
    // and returns shaped record
    if itemType == "Patient" {
        i := r.Change.NewImage["id"]
        n := r.Change.NewImage["name"]
        t := r.Change.NewImage["itemType"]
        s := r.Change.NewImage["sk"]
        pid := r.Change.NewImage["patientId"]

        change := fmt.Sprintf("Patient%s", strings.Title(strings.ToLower(r.EventName)))
        return &CustomEvent{
            EventType:     change,
            CorrelationId: r.EventID,
            Body: &ItemOne{
                Id:        i.String(),
                Name:      n.String(),
                ItemType:  t.String(),
                Sk:        s.String(),
                PatientId: pid.String(),
            }}, nil
    } else if itemType == "Address" {
        i := r.Change.NewImage["id"]
        n := r.Change.NewImage["address"]
        t := r.Change.NewImage["itemType"]
        s := r.Change.NewImage["sk"]
        pid := r.Change.NewImage["patientId"]
        aid := r.Change.NewImage["addressId"]
        change := fmt.Sprintf("Address%s", strings.Title(strings.ToLower(r.EventName)))
        return &CustomEvent{
            EventType:     change,
            CorrelationId: r.EventID,
            Body: &ItemTwo{
                Id:        i.String(),
                Address:   n.String(),
                ItemType:  t.String(),
                Sk:        s.String(),
                PatientId: pid.String(),
                AddressId: aid.String(),
            }}, nil
    }
}

目标

事件以我想要的格式塑造后,是时候将有效载荷发送到EventBridge Bus了。我将把输出塑造成我喜欢的结果。

return {
    eventBridgeEventBusParameters: {
        detailType: "PatientChange",
        source: "com.binaryheap.patient",
    },
    inputTemplate: `{
            "meta": {
                "correlationId": <$.eventId>,
                "changeType": <$.eventType>
            },
            "event": <$.body>
        }`,
};

就像源输入一样,我需要授予消费者将发布到EventBridge的能力。

return new PolicyDocument({
    statements: [
        new PolicyStatement({
            resources: [busArn],
            actions: ["events:PutEvents"],
            effect: Effect.ALLOW,
        }),
    ],
});

处理项目类型的规则

既然我已经发布了EventBridge的默认巴士的管道出版,我可以制定一些规则。在多播场景中将DynamoDB流到EventBridge管道时,我的特定规则将有助于目标我想处理项目更改的Lambda功能。这些也可能是队列或您喜欢的其他任何东西。这是一个表格中有多种项目类型的地方。您可以让服务消费者处理患者表中的所有更改,或者您可以让特定的消费者处理特定物品类型。我的示例显示了后者。我想特定要突出图案。

病人规则

与患者打交道时,我可能想解决有关该记录的特定内容。我首先需要构建一个用于处理总线消息和我想要的目标的事件桥规则。

this._handlerOne = new GoFunction(scope, "ItemOneHandlerFunction", {
    entry: "src/type-one-handler",
    functionName: `type-one-handler`,
    timeout: Duration.seconds(15),
    environment: {
        IS_LOCAL: "false",
        LOG_LEVEL: "DEBUG",
        VERSION: props.version,
    },
});

此代码将部署将成为我患者规则的目标。

const rule = new Rule(scope, "ItemOnHandlerRule", {
    eventPattern: {
        detailType: ["PatientChange"],
        detail: {
            meta: {
                changeType: ["PatientModify", "PatientInsert"],
            },
        },
    },
    eventBus: EventBus.fromEventBusArn(scope, "DefaultBusItemOne", busArn),
    ruleName: "item-one-rule",
});

const dlq = new Queue(this, "ItemOneHandler-DLQ");
rule.addTarget(
    new targets.LambdaFunction(handler, {
        deadLetterQueue: dlq,
    })
);

您会注意到,我正在寻找PatientChange的顶级细节类型。然后,我更深入地研究PatientInsertPatientModify更改类型的有效载荷。然后将其转发到我的项目一个lambda中。

地址规则

接下来,我构建了一个几乎相同的规则,但专门用于地址。

const rule = new Rule(scope, "ItemTwoHandlerRule", {
    eventPattern: {
        detailType: ["PatientChange"],
        detail: {
            meta: {
                changeType: ["AddressModify", "AddressInsert"],
            },
        },
    },
    eventBus: EventBus.fromEventBusArn(scope, "DefaultBusItemTwo", busArn),
    ruleName: "item-two-rule",
});

const dlq = new Queue(this, "ItemTwoHandler-DLQ");
rule.addTarget(
    new targets.LambdaFunction(handler, {
        deadLetterQueue: dlq,
    })
);

Address Rule

在目标的背面,我有两个单独的lambdas。对于此示例,它们相同,因为他们只是打印出有效载荷。

func handler(ctx context.Context, e interface{}) (interface{}, error) {
    log.WithFields(log.Fields{
        "body": e,
    }).Debug("Printing out the body")

    return e, nil
}

患者输出
Patient Output

地址输出
Address Output

关于模式的注释

将DynamoDB流到EventBridge管道时,您可以从过滤,丰富然后最终进行目标。由于发现它的性能非常有效,并且很容易设置和推理,因此我继续将管道放入工作负载中。我还发现,在这些类型的工作流程中使用步骤函数的地方,我现在默认为管道。

包起来

要将物品恢复在一起,流式DynamoDB的限制为2个消费者,您可以将其附加到流到流。当您在表中有1种记录时,该限制并不重要。您可以使用一个流处理插件和修改,然后另一个管道来处理删除。但是,当您有单台或多类型情况时,您需要更多的服务。

使用EventBridge的规则和目标恰恰是使这可能成为可能的服务和功能。我还发现,如果您也拥有具有许可边界的本地团队,则可以通过Event Bus Mesh进一步增强。

一如既往地,对于[完全运行的and working repository,您可以前往github并克隆它。

我希望在使用DynamoDB和EventBridge Pipes时,这会让您在工具箱中更多。

快乐建筑!