逐步学习AWS上的无服务器-DynamoDB流
#aws #javascript #教程 #serverless

tl; dr

在本系列中,我尝试解释AWS上无服务器的基础知识,以使您能够构建自己的无服务器应用程序。使用last article,我们发现了SNS主题以及如何使用它们发送通知。在本文中,我们将解决DynamoDB流,该流可以对DynamoDB表中的更改做出反应!

我们今天将做什么?

  • 我将向您展示如何对被插入或修改的项目中的项目做出反应。
  • 我们将创建一个由DynamoDB流提供支持的简单餐厅预订应用程序。
  • 一切都将是事件驱动的!

本文中有很多代码,因为我想提供一个接近现实世界应用程序的示例。如果您只想查看DynamoDB流零件,请检查Create a DynamoDB stream linked to a Lambda functionThird Lambda function: streamTarget零件。

â€i,如果您想要更多的â€

,请定期发布无服务器内容

Follow me on twitter 🚀

什么是DynamoDB流,为什么有用?

基本上,DynamoDB流允许目标在DynamoDB表中聆听进行的更改。无服务器时,此目标通常是一个lambda功能,它具有基于对表的更改的副作用。

此功能非常强大:它允许您构建事件驱动的应用程序,其中组件松散耦合:副作用是由数据库中的更改而不是通过直接调用函数触发的,允许您允许您构建非常可扩展的应用程序。

进一步,DynamoDB流是AWS上无服务器事件源模式的主要构件。它们允许您构建对失败非常有弹性的应用程序,并且可以轻松缩放。我不会在这里详细介绍,但是如果您想了解有关此主题的更多信息,请检查此article by Maxime Vivier

让我们构建餐厅预订应用程序!

在本文中,我们将构建一个简单的餐厅预订应用程序。预订可以待确认或确认。创建预订时,它正在待处理,并将电子邮件发送到餐厅,并带有链接以确认预订。当餐厅确认预订时,预订被标记为确认,客户收到了一封带有确认的电子邮件。

为了满足这些要求,我们将在DynamoDB表中存储预订,并对要插入的项目(创建预订)或修改后的项目(确认预订)以将电子邮件发送给合适的人。我们应用程序的架构看起来像这样:

Architecture

最重要的部分是在图的中间:DynamoDB流转发在表中变化为lambda函数。根据情况,Lambda功能将向正确的Lambda派遣事件,以将正确的电子邮件发送给合适的人。

我们还将在发送给餐厅的电子邮件中创建一个链接以确认预订,此链接将触发确认过程。

要开发此应用程序,我将使用AWS CDK。我已经在all the articles of this series中使用过。此外,每个概念都期望DynamoDB流以前已经解决了。因此,如果您需要刷新,请免费检查旧文章!

设置我们应用程序的资源

让我们首先创建一个新的CDK堆栈,其中包含我们需要的所有资源,除了lambda函数。

import { Construct } from 'constructs';
import * as cdk from 'aws-cdk-lib';
import path from 'path';

import { restaurantBookedTemplateHtml } from './onRestaurantBooked/template';
import { reservationConfirmedTemplateHtml } from './onReservationConfirmed/template';

export class ArticleDDBStream extends Construct {
  constructor(scope: Construct, id: string, api: cdk.aws_apigateway.RestApi, identity: cdk.aws_ses.EmailIdentity) {
    super(scope, id);

    // Api to create and confirm bookings
    const api = new cdk.aws_apigateway.RestApi(this, 'api', {});

    // Table to store reservations
    const table = new cdk.aws_dynamodb.Table(this, 'ReservationsTable', {
      partitionKey: { name: 'SK', type: cdk.aws_dynamodb.AttributeType.STRING },
      sortKey: { name: 'PK', type: cdk.aws_dynamodb.AttributeType.STRING },
      billingMode: cdk.aws_dynamodb.BillingMode.PAY_PER_REQUEST,
      // We need to enable streams on the table
      stream: cdk.aws_dynamodb.StreamViewType.NEW_IMAGE,
    });

    // Event bus to dispatch events
    const bus = new cdk.aws_events.EventBus(this, 'EventBus');

    // SES Identity to send emails
    // You can also use an email address identity
    // Check my SES article for more details
    const DOMAIN_NAME = '<YOUR_DOMAIN_NAME>';
    const hostedZone = cdk.aws_route53.HostedZone.fromLookup(this, 'hostedZone', {
      domainName: DOMAIN_NAME,
    });
    const identity = new cdk.aws_ses.EmailIdentity(this, 'sesIdentity', {
      identity: cdk.aws_ses.Identity.publicHostedZone(hostedZone),
    });

    // Email template when a restaurant is booked
    const restaurantBookedTemplate = new cdk.aws_ses.CfnTemplate(this, 'RestaurantBookedTemplate', {
      template: {
        templateName: 'restaurantBookedTemplate',
        subjectPart: 'Restaurant booked',
        htmlPart: restaurantBookedTemplateHtml,
      },
    });

    // Email template when a reservation is confirmed
    const reservationConfirmedTemplate = new cdk.aws_ses.CfnTemplate(this, 'ReservationConfirmedTemplate', {
      template: {
        templateName: 'reservationConfirmedTemplate',
        subjectPart: 'Reservation confirmed',
        htmlPart: reservationConfirmedTemplateHtml,
      },
    });
  }
}

在这里没有什么新鲜事,我们创建一个API,一个DynamoDB表,一个事件总线,一个SES身份和两个SES模板。唯一的新事物是表的stream属性。它允许我们创建链接到表的DynamoDB流。 NEW_IMAGE值意味着该流进行修改后将包含该项目的新版本。

要保持简短,我没有包含电子邮件模板的内容。您可以找到它们herehere。如果您需要对SES和模板的电子邮件进行复习,请检查this article

创建链接到lambda函数的DynamoDB流

现在我们已经设置了,让我们创建一个将由dynamodb流触发的lambda函数。

// ... previous code
const streamTarget = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'StreamTarget', {
  entry: path.join(__dirname, 'streamTarget', 'handler.ts'),
  handler: 'handler',
  environment: {
    EVENT_BUS_NAME: bus.eventBusName,
  },
});

table.grantStreamRead(streamTarget);
streamTarget.addEventSourceMapping('StreamSource', {
  eventSourceArn: table.tableStreamArn,
  startingPosition: cdk.aws_lambda.StartingPosition.LATEST,
  batchSize: 1,
});
streamTarget.addToRolePolicy(
  new cdk.aws_iam.PolicyStatement({
    actions: ['events:PutEvents'],
    resources: [bus.eventBusArn],
  }),
);

将lambda函数链接到DynamoDB流包括创建事件源映射。每当将项目插入或修改在表格中时,此映射将触发lambda功能。

batchSize属性允许您一次指定将发送多少个项目。在这里,我们将其设置为1,因此将为每个项目触发Lambda功能。

不要忘记授予lambda函数读取流(输入)的权利,并添加将事件放在事件总线上的权利(输出)。

创建其余的lambda功能

最后,让我们创建其他4个lambda函数:

  • 2启用API调用以修改表
  • 2对流目标派遣的事件做出反应并发送电子邮件
const bookRestaurant = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'BookRestaurant', {
  entry: path.join(__dirname, 'bookRestaurant', 'handler.ts'),
  handler: 'handler',
  environment: {
    TABLE_NAME: table.tableName,
  },
});

table.grantWriteData(bookRestaurant);
api.root.addResource('bookRestaurant').addMethod('POST', new cdk.aws_apigateway.LambdaIntegration(bookRestaurant));

const confirmReservation = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'ConfirmReservation', {
  entry: path.join(__dirname, 'confirmReservation', 'handler.ts'),
  handler: 'handler',
  environment: {
    TABLE_NAME: table.tableName,
  },
});

table.grantWriteData(confirmReservation);
api.root
  .addResource('confirmReservation')
  .addResource('{reservationId}')
  .addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(confirmReservation));

const onRestaurantBookedRule = new cdk.aws_events.Rule(this, 'OnRestaurantBookedRule', {
  eventBus: bus,
  eventPattern: {
    source: ['StreamTarget'],
    detailType: ['OnRestaurantBooked'],
  },
});

const onRestaurantBooked = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'OnRestaurantBookedLambda', {
  entry: path.join(__dirname, 'onRestaurantBooked', 'handler.ts'),
  handler: 'handler',
  environment: {
    FROM_EMAIL_ADDRESS: `notifications@${identity.emailIdentityName}`,
    API_URL: api.url,
    TEMPLATE_NAME: restaurantBookedTemplate.ref,
  },
});

onRestaurantBookedRule.addTarget(new cdk.aws_events_targets.LambdaFunction(onRestaurantBooked));
onRestaurantBooked.addToRolePolicy(
  new cdk.aws_iam.PolicyStatement({
    actions: ['ses:SendTemplatedEmail'],
    resources: ['*'],
  }),
);

const onReservationConfirmedRule = new cdk.aws_events.Rule(this, 'OnReservationConfirmedRule', {
  eventBus: bus,
  eventPattern: {
    source: ['StreamTarget'],
    detailType: ['OnReservationConfirmed'],
  },
});

const onReservationConfirmed = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'OnReservationConfirmedLambda', {
  entry: path.join(__dirname, 'onReservationConfirmed', 'handler.ts'),
  handler: 'handler',
  environment: {
    FROM_EMAIL_ADDRESS: `notifications@${identity.emailIdentityName}`,
    TEMPLATE_NAME: reservationConfirmedTemplate.ref,
  },
});

onReservationConfirmedRule.addTarget(new cdk.aws_events_targets.LambdaFunction(onReservationConfirmed));
onReservationConfirmed.addToRolePolicy(
  new cdk.aws_iam.PolicyStatement({
    actions: ['ses:SendTemplatedEmail'],
    resources: ['*'],
  }),
);

前两个功能非常简单:它们允许创建预订并确认它们。它们与API链接,因此任何人都可以打电话给它们。他们有权编辑DynamoDB表。我使用get端点确认预订,以便能够在发送给餐厅的电子邮件中使用链接。

最后两个功能链接到事件总线。它们将由流目标触发,并将向合适的人发送电子邮件。我使用SES模板的电子邮件发送电子邮件,因此我必须添加权利将模板的电子邮件发送到lambda函数。

实施lambda功能

现在我们拥有所需的所有资源,让我们实现lambda功能!首先,让我们解决将存储在表中的数据格式。它将简化代码。

// types.ts
export type Reservation = {
  // PK='RESERVATION'
  id: string; // SK
  firstName: string; // S
  lastName: string; // S
  email: string; // S
  dateTime: string; // S
  partySize: number; // N
  // status: S
};

第一个lambda功能:bookRestaurant

import { DynamoDBClient, PutItemCommand } from '@aws-sdk/client-dynamodb';

import { v4 as uuid } from 'uuid';
import { Reservation } from '../types';

const client = new DynamoDBClient({});

export const handler = async ({ body }: { body: string }): Promise<{ statusCode: number; body: string }> => {
  const tableName = process.env.TABLE_NAME;

  if (tableName === undefined) {
    throw new Error('TABLE_NAME environment variable must be defined');
  }

  const { firstName, lastName, email, dateTime, partySize } = JSON.parse(body) as Partial<Reservation>;

  if (
    firstName === undefined ||
    lastName === undefined ||
    email === undefined ||
    dateTime === undefined ||
    partySize === undefined
  ) {
    return {
      statusCode: 400,
      body: 'Bad request',
    };
  }

  const reservationId = uuid();

  await client.send(
    new PutItemCommand({
      TableName: tableName,
      Item: {
        PK: { S: `RESERVATION` },
        SK: { S: reservationId },
        firstName: { S: firstName },
        lastName: { S: lastName },
        email: { S: email },
        partySize: { N: partySize.toString() },
        dateTime: { S: dateTime },
        status: { S: 'PENDING' },
      },
    }),
  );

  return {
    statusCode: 200,
    body: JSON.stringify({
      reservationId,
    }),
  };
};

简单:我们解析请求的主体,然后将预订存储在桌子内。默认情况下,我们将状态设置为PENDING。检查我的series of articles,如果您需要有关如何与DynamoDB表和API网关进行交互的复习。

第二个lambda功能:confirmReservation7

import { DynamoDBClient, UpdateItemCommand } from '@aws-sdk/client-dynamodb';

const client = new DynamoDBClient({});

export const handler = async ({
  pathParameters,
}: {
  pathParameters: { reservationId: string };
}): Promise<{ statusCode: number; body: string; headers: unknown }> => {
  const tableName = process.env.TABLE_NAME;

  if (tableName === undefined) {
    throw new Error('TABLE_NAME environment variable must be defined');
  }

  await client.send(
    new UpdateItemCommand({
      TableName: tableName,
      Key: {
        PK: { S: `RESERVATION` },
        SK: { S: pathParameters.reservationId },
      },
      UpdateExpression: 'SET #status = :status',
      ExpressionAttributeNames: {
        '#status': 'status',
      },
      ExpressionAttributeValues: {
        ':status': { S: 'CONFIRMED' },
      },
    }),
  );

  return {
    statusCode: 200,
    body: JSON.stringify({
      reservationId: pathParameters.reservationId,
    }),
    headers: {
      'Content-Type': 'application/json',
      'Access-Control-Allow-Origin': '*',
    },
  };
};

此功能也非常简单:它将保留状态更新为CONFIRMED。这里有两个小技巧:

  • status属性是dynamodb中的保留关键字,因此我们需要使用表达式属性名称进行更新。
  • 我们需要将Access-Control-Allow-Origin标头添加到响应中,以允许浏览器调用API。 (我们希望电子邮件中的链接起作用)

第三lambda功能:streamTarget

import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';
import { Reservation } from '../types';

type InputProps = {
  Records: {
    eventName: string;
    dynamodb: {
      NewImage: {
        PK: { S: string };
        SK: { S: string };
        email: { S: string };
        firstName: { S: string };
        lastName: { S: string };
        dateTime: { S: string };
        partySize: { N: string };
        status: { S: string };
      };
    };
  }[];
};

const client = new EventBridgeClient({});

export const handler = async ({ Records }: InputProps): Promise<void> => {
  const eventBusName = process.env.EVENT_BUS_NAME;

  if (eventBusName === undefined) {
    throw new Error('EVENT_BUS_NAME environment variable is not set');
  }

  await Promise.all(
    Records.map(async ({ dynamodb, eventName }) => {
      if (eventName !== 'INSERT' && eventName !== 'MODIFY') {
        return;
      }

      const { SK, email, firstName, lastName, dateTime, partySize } = dynamodb.NewImage;

      const eventDetail: Reservation = {
        id: SK.S,
        firstName: firstName.S,
        lastName: lastName.S,
        email: email.S,
        dateTime: dateTime.S,
        partySize: +partySize.N,
      };

      await client.send(
        new PutEventsCommand({
          Entries: [
            {
              EventBusName: eventBusName,
              Source: 'StreamTarget',
              DetailType: eventName === 'INSERT' ? 'OnRestaurantBooked' : 'OnReservationConfirmed',
              Detail: JSON.stringify(eventDetail),
            },
          ],
        }),
      );
    }),
  );
};

这是本文的重要部分:DynamoDB流将触发的lambda函数。该功能会收到记录列表,每个记录都包含表中的项目的新版本。我们过滤记录以仅保留INSERTMODIFY事件的记录,我们根据事件类型在事件总线上派遣事件。

我们还将dynamodb项目映射到Reservation对象,以简化其他lambda函数的代码。

第四个lambda功能:onRestaurantBooked

import { SESv2Client, SendEmailCommand } from '@aws-sdk/client-sesv2';
import { Reservation } from '../types';

const client = new SESv2Client({});
const RESTAURANT_OWNER_EMAIL_ADDRESS = 'pierrech@theodo.fr';

export const handler = async ({ detail }: { detail: Reservation }): Promise<void> => {
  const templateName = process.env.TEMPLATE_NAME;
  const apiURL = process.env.API_URL;
  const fromEmailAddress = process.env.FROM_EMAIL_ADDRESS;

  if (templateName === undefined || apiURL === undefined || fromEmailAddress === undefined) {
    throw new Error('TEMPLATE_NAME, API_URL and FROM_EMAIL_ADDRESS environment variables must be defined');
  }

  await client.send(
    new SendEmailCommand({
      FromEmailAddress: fromEmailAddress,
      Destination: {
        ToAddresses: [RESTAURANT_OWNER_EMAIL_ADDRESS],
      },
      Content: {
        Template: {
          TemplateName: templateName,
          TemplateData: JSON.stringify({
            firstName: detail.firstName,
            lastName: detail.lastName,
            dateTime: detail.dateTime,
            partySize: detail.partySize,
            apiURL,
            reservationId: detail.id,
          }),
        },
      },
    }),
  );
};

此Lambda对EventBridge事件做出反应。它可以访问事件的细节,即Reservation对象。它使用此数据来补充SES模板并将电子邮件发送给餐厅所有者。它还使用API​​_URL环境变量来创建链接以确认预订。 (链接的创建是在模板内完成的,检查here

请注意,电子邮件发送到固定的电子邮件地址。在真实的应用程序中,您可能希望将餐厅的电子邮件地址存储在其他地方,然后在此处检索。

第五lambda功能:onReservationConfirmed

import { SESv2Client, SendEmailCommand } from '@aws-sdk/client-sesv2';
import { Reservation } from '../types';

const client = new SESv2Client({});

export const handler = async ({ detail }: { detail: Reservation }): Promise<void> => {
  const templateName = process.env.TEMPLATE_NAME;
  const fromEmailAddress = process.env.FROM_EMAIL_ADDRESS;

  if (templateName === undefined || fromEmailAddress === undefined) {
    throw new Error('TEMPLATE_NAME and FROM_EMAIL_ADDRESS environment variables must be defined');
  }

  await client.send(
    new SendEmailCommand({
      FromEmailAddress: fromEmailAddress,
      Destination: {
        ToAddresses: [detail.email],
      },
      Content: {
        Template: {
          TemplateName: templateName,
          TemplateData: JSON.stringify({
            firstName: detail.firstName,
            lastName: detail.lastName,
            dateTime: detail.dateTime,
            partySize: detail.partySize,
          }),
        },
      },
    }),
  );
};

基本上是同一件事,除了电子邮件模板更简单(无链接)。我们将电子邮件发送给客户。

就是这样!

测试应用程序

要测试应用程序,我们只需要执行Postman的单个API调用即可。让我们创建一个预订:

Postman

几秒钟后,我收到了一封电子邮件(作为餐馆老板):

Booking email

当我单击链接时,预订会得到确认。我收到了确认电子邮件(作为客户):

Confirmation email

那样简单!该应用程序非常可扩展,因为它是事件驱动的:我们可以在确认预订时添加外部API调用,发送收据,流程付款等...而无需修改应用程序的基础体系结构。

结论

这是一篇很长的文章,但我希望您学到很多东西! DynamoDB流是AWS的功能非常强大的功能,下一步朝这个方向进行了使用,就是使用它们来创建事件传输应用程序。再一次,我建议您检查this article,如果您想了解更多有关此主题的信息。

我计划每两月继续这一系列文章。我已经涵盖了简单的lambda函数和REST API的创建,并与DynamoDB数据库和S3存储桶进行了交互。您可以在我的repository上遵循此进度!我将介绍新主题,例如前端部署,类型安全,更高级的模式等...如果您有任何建议,请随时与我联系!

,如果您可以与您的朋友和同事分享这篇文章,我真的很感激。这将有助于我发展听众。另外,不要忘记在下一篇文章发行时订阅要更新!

我想在这里保持联系是我的twitter account。我经常发布或重新发布有关AWS和无服务器的有趣内容,请随时关注我!

Follow me on twitter 🚀