tl; dr
在本系列中,我尝试解释AWS上无服务器的基础知识,以使您能够构建自己的无服务器应用程序。使用last article,我们发现了SNS主题以及如何使用它们发送通知。在本文中,我们将解决DynamoDB流,该流可以对DynamoDB表中的更改做出反应!
我们今天将做什么?
- 我将向您展示如何对被插入或修改的项目中的项目做出反应。
- 我们将创建一个由DynamoDB流提供支持的简单餐厅预订应用程序。
- 一切都将是事件驱动的!
本文中有很多代码,因为我想提供一个接近现实世界应用程序的示例。如果您只想查看DynamoDB流零件,请检查Create a DynamoDB stream linked to a Lambda function
和Third Lambda function: streamTarget
零件。
â€i,如果您想要更多的â€
,请定期发布无服务器内容什么是DynamoDB流,为什么有用?
基本上,DynamoDB流允许目标在DynamoDB表中聆听进行的更改。无服务器时,此目标通常是一个lambda功能,它具有基于对表的更改的副作用。
此功能非常强大:它允许您构建事件驱动的应用程序,其中组件松散耦合:副作用是由数据库中的更改而不是通过直接调用函数触发的,允许您允许您构建非常可扩展的应用程序。
进一步,DynamoDB流是AWS上无服务器事件源模式的主要构件。它们允许您构建对失败非常有弹性的应用程序,并且可以轻松缩放。我不会在这里详细介绍,但是如果您想了解有关此主题的更多信息,请检查此article by Maxime Vivier。
让我们构建餐厅预订应用程序!
在本文中,我们将构建一个简单的餐厅预订应用程序。预订可以待确认或确认。创建预订时,它正在待处理,并将电子邮件发送到餐厅,并带有链接以确认预订。当餐厅确认预订时,预订被标记为确认,客户收到了一封带有确认的电子邮件。
为了满足这些要求,我们将在DynamoDB表中存储预订,并对要插入的项目(创建预订)或修改后的项目(确认预订)以将电子邮件发送给合适的人。我们应用程序的架构看起来像这样:
最重要的部分是在图的中间:DynamoDB流转发在表中变化为lambda函数。根据情况,Lambda功能将向正确的Lambda派遣事件,以将正确的电子邮件发送给合适的人。
我们还将在发送给餐厅的电子邮件中创建一个链接以确认预订,此链接将触发确认过程。
要开发此应用程序,我将使用AWS CDK。我已经在all the articles of this series中使用过。此外,每个概念都期望DynamoDB流以前已经解决了。因此,如果您需要刷新,请免费检查旧文章!
设置我们应用程序的资源
让我们首先创建一个新的CDK堆栈,其中包含我们需要的所有资源,除了lambda函数。
import { Construct } from 'constructs';
import * as cdk from 'aws-cdk-lib';
import path from 'path';
import { restaurantBookedTemplateHtml } from './onRestaurantBooked/template';
import { reservationConfirmedTemplateHtml } from './onReservationConfirmed/template';
export class ArticleDDBStream extends Construct {
constructor(scope: Construct, id: string, api: cdk.aws_apigateway.RestApi, identity: cdk.aws_ses.EmailIdentity) {
super(scope, id);
// Api to create and confirm bookings
const api = new cdk.aws_apigateway.RestApi(this, 'api', {});
// Table to store reservations
const table = new cdk.aws_dynamodb.Table(this, 'ReservationsTable', {
partitionKey: { name: 'SK', type: cdk.aws_dynamodb.AttributeType.STRING },
sortKey: { name: 'PK', type: cdk.aws_dynamodb.AttributeType.STRING },
billingMode: cdk.aws_dynamodb.BillingMode.PAY_PER_REQUEST,
// We need to enable streams on the table
stream: cdk.aws_dynamodb.StreamViewType.NEW_IMAGE,
});
// Event bus to dispatch events
const bus = new cdk.aws_events.EventBus(this, 'EventBus');
// SES Identity to send emails
// You can also use an email address identity
// Check my SES article for more details
const DOMAIN_NAME = '<YOUR_DOMAIN_NAME>';
const hostedZone = cdk.aws_route53.HostedZone.fromLookup(this, 'hostedZone', {
domainName: DOMAIN_NAME,
});
const identity = new cdk.aws_ses.EmailIdentity(this, 'sesIdentity', {
identity: cdk.aws_ses.Identity.publicHostedZone(hostedZone),
});
// Email template when a restaurant is booked
const restaurantBookedTemplate = new cdk.aws_ses.CfnTemplate(this, 'RestaurantBookedTemplate', {
template: {
templateName: 'restaurantBookedTemplate',
subjectPart: 'Restaurant booked',
htmlPart: restaurantBookedTemplateHtml,
},
});
// Email template when a reservation is confirmed
const reservationConfirmedTemplate = new cdk.aws_ses.CfnTemplate(this, 'ReservationConfirmedTemplate', {
template: {
templateName: 'reservationConfirmedTemplate',
subjectPart: 'Reservation confirmed',
htmlPart: reservationConfirmedTemplateHtml,
},
});
}
}
在这里没有什么新鲜事,我们创建一个API,一个DynamoDB表,一个事件总线,一个SES身份和两个SES模板。唯一的新事物是表的stream
属性。它允许我们创建链接到表的DynamoDB流。 NEW_IMAGE
值意味着该流进行修改后将包含该项目的新版本。
要保持简短,我没有包含电子邮件模板的内容。您可以找到它们here和here。如果您需要对SES和模板的电子邮件进行复习,请检查this article。
创建链接到lambda函数的DynamoDB流
现在我们已经设置了,让我们创建一个将由dynamodb流触发的lambda函数。
// ... previous code
const streamTarget = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'StreamTarget', {
entry: path.join(__dirname, 'streamTarget', 'handler.ts'),
handler: 'handler',
environment: {
EVENT_BUS_NAME: bus.eventBusName,
},
});
table.grantStreamRead(streamTarget);
streamTarget.addEventSourceMapping('StreamSource', {
eventSourceArn: table.tableStreamArn,
startingPosition: cdk.aws_lambda.StartingPosition.LATEST,
batchSize: 1,
});
streamTarget.addToRolePolicy(
new cdk.aws_iam.PolicyStatement({
actions: ['events:PutEvents'],
resources: [bus.eventBusArn],
}),
);
将lambda函数链接到DynamoDB流包括创建事件源映射。每当将项目插入或修改在表格中时,此映射将触发lambda功能。
batchSize
属性允许您一次指定将发送多少个项目。在这里,我们将其设置为1,因此将为每个项目触发Lambda功能。
不要忘记授予lambda函数读取流(输入)的权利,并添加将事件放在事件总线上的权利(输出)。
创建其余的lambda功能
最后,让我们创建其他4个lambda函数:
- 2启用API调用以修改表
- 2对流目标派遣的事件做出反应并发送电子邮件
const bookRestaurant = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'BookRestaurant', {
entry: path.join(__dirname, 'bookRestaurant', 'handler.ts'),
handler: 'handler',
environment: {
TABLE_NAME: table.tableName,
},
});
table.grantWriteData(bookRestaurant);
api.root.addResource('bookRestaurant').addMethod('POST', new cdk.aws_apigateway.LambdaIntegration(bookRestaurant));
const confirmReservation = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'ConfirmReservation', {
entry: path.join(__dirname, 'confirmReservation', 'handler.ts'),
handler: 'handler',
environment: {
TABLE_NAME: table.tableName,
},
});
table.grantWriteData(confirmReservation);
api.root
.addResource('confirmReservation')
.addResource('{reservationId}')
.addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(confirmReservation));
const onRestaurantBookedRule = new cdk.aws_events.Rule(this, 'OnRestaurantBookedRule', {
eventBus: bus,
eventPattern: {
source: ['StreamTarget'],
detailType: ['OnRestaurantBooked'],
},
});
const onRestaurantBooked = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'OnRestaurantBookedLambda', {
entry: path.join(__dirname, 'onRestaurantBooked', 'handler.ts'),
handler: 'handler',
environment: {
FROM_EMAIL_ADDRESS: `notifications@${identity.emailIdentityName}`,
API_URL: api.url,
TEMPLATE_NAME: restaurantBookedTemplate.ref,
},
});
onRestaurantBookedRule.addTarget(new cdk.aws_events_targets.LambdaFunction(onRestaurantBooked));
onRestaurantBooked.addToRolePolicy(
new cdk.aws_iam.PolicyStatement({
actions: ['ses:SendTemplatedEmail'],
resources: ['*'],
}),
);
const onReservationConfirmedRule = new cdk.aws_events.Rule(this, 'OnReservationConfirmedRule', {
eventBus: bus,
eventPattern: {
source: ['StreamTarget'],
detailType: ['OnReservationConfirmed'],
},
});
const onReservationConfirmed = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'OnReservationConfirmedLambda', {
entry: path.join(__dirname, 'onReservationConfirmed', 'handler.ts'),
handler: 'handler',
environment: {
FROM_EMAIL_ADDRESS: `notifications@${identity.emailIdentityName}`,
TEMPLATE_NAME: reservationConfirmedTemplate.ref,
},
});
onReservationConfirmedRule.addTarget(new cdk.aws_events_targets.LambdaFunction(onReservationConfirmed));
onReservationConfirmed.addToRolePolicy(
new cdk.aws_iam.PolicyStatement({
actions: ['ses:SendTemplatedEmail'],
resources: ['*'],
}),
);
前两个功能非常简单:它们允许创建预订并确认它们。它们与API链接,因此任何人都可以打电话给它们。他们有权编辑DynamoDB表。我使用get端点确认预订,以便能够在发送给餐厅的电子邮件中使用链接。
最后两个功能链接到事件总线。它们将由流目标触发,并将向合适的人发送电子邮件。我使用SES模板的电子邮件发送电子邮件,因此我必须添加权利将模板的电子邮件发送到lambda函数。
实施lambda功能
现在我们拥有所需的所有资源,让我们实现lambda功能!首先,让我们解决将存储在表中的数据格式。它将简化代码。
// types.ts
export type Reservation = {
// PK='RESERVATION'
id: string; // SK
firstName: string; // S
lastName: string; // S
email: string; // S
dateTime: string; // S
partySize: number; // N
// status: S
};
第一个lambda功能:bookRestaurant
import { DynamoDBClient, PutItemCommand } from '@aws-sdk/client-dynamodb';
import { v4 as uuid } from 'uuid';
import { Reservation } from '../types';
const client = new DynamoDBClient({});
export const handler = async ({ body }: { body: string }): Promise<{ statusCode: number; body: string }> => {
const tableName = process.env.TABLE_NAME;
if (tableName === undefined) {
throw new Error('TABLE_NAME environment variable must be defined');
}
const { firstName, lastName, email, dateTime, partySize } = JSON.parse(body) as Partial<Reservation>;
if (
firstName === undefined ||
lastName === undefined ||
email === undefined ||
dateTime === undefined ||
partySize === undefined
) {
return {
statusCode: 400,
body: 'Bad request',
};
}
const reservationId = uuid();
await client.send(
new PutItemCommand({
TableName: tableName,
Item: {
PK: { S: `RESERVATION` },
SK: { S: reservationId },
firstName: { S: firstName },
lastName: { S: lastName },
email: { S: email },
partySize: { N: partySize.toString() },
dateTime: { S: dateTime },
status: { S: 'PENDING' },
},
}),
);
return {
statusCode: 200,
body: JSON.stringify({
reservationId,
}),
};
};
简单:我们解析请求的主体,然后将预订存储在桌子内。默认情况下,我们将状态设置为PENDING
。检查我的series of articles,如果您需要有关如何与DynamoDB表和API网关进行交互的复习。
第二个lambda功能:confirmReservation
7
import { DynamoDBClient, UpdateItemCommand } from '@aws-sdk/client-dynamodb';
const client = new DynamoDBClient({});
export const handler = async ({
pathParameters,
}: {
pathParameters: { reservationId: string };
}): Promise<{ statusCode: number; body: string; headers: unknown }> => {
const tableName = process.env.TABLE_NAME;
if (tableName === undefined) {
throw new Error('TABLE_NAME environment variable must be defined');
}
await client.send(
new UpdateItemCommand({
TableName: tableName,
Key: {
PK: { S: `RESERVATION` },
SK: { S: pathParameters.reservationId },
},
UpdateExpression: 'SET #status = :status',
ExpressionAttributeNames: {
'#status': 'status',
},
ExpressionAttributeValues: {
':status': { S: 'CONFIRMED' },
},
}),
);
return {
statusCode: 200,
body: JSON.stringify({
reservationId: pathParameters.reservationId,
}),
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
},
};
};
此功能也非常简单:它将保留状态更新为CONFIRMED
。这里有两个小技巧:
-
status
属性是dynamodb中的保留关键字,因此我们需要使用表达式属性名称进行更新。 - 我们需要将
Access-Control-Allow-Origin
标头添加到响应中,以允许浏览器调用API。 (我们希望电子邮件中的链接起作用)
第三lambda功能:streamTarget
import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';
import { Reservation } from '../types';
type InputProps = {
Records: {
eventName: string;
dynamodb: {
NewImage: {
PK: { S: string };
SK: { S: string };
email: { S: string };
firstName: { S: string };
lastName: { S: string };
dateTime: { S: string };
partySize: { N: string };
status: { S: string };
};
};
}[];
};
const client = new EventBridgeClient({});
export const handler = async ({ Records }: InputProps): Promise<void> => {
const eventBusName = process.env.EVENT_BUS_NAME;
if (eventBusName === undefined) {
throw new Error('EVENT_BUS_NAME environment variable is not set');
}
await Promise.all(
Records.map(async ({ dynamodb, eventName }) => {
if (eventName !== 'INSERT' && eventName !== 'MODIFY') {
return;
}
const { SK, email, firstName, lastName, dateTime, partySize } = dynamodb.NewImage;
const eventDetail: Reservation = {
id: SK.S,
firstName: firstName.S,
lastName: lastName.S,
email: email.S,
dateTime: dateTime.S,
partySize: +partySize.N,
};
await client.send(
new PutEventsCommand({
Entries: [
{
EventBusName: eventBusName,
Source: 'StreamTarget',
DetailType: eventName === 'INSERT' ? 'OnRestaurantBooked' : 'OnReservationConfirmed',
Detail: JSON.stringify(eventDetail),
},
],
}),
);
}),
);
};
这是本文的重要部分:DynamoDB流将触发的lambda函数。该功能会收到记录列表,每个记录都包含表中的项目的新版本。我们过滤记录以仅保留INSERT
或MODIFY
事件的记录,我们根据事件类型在事件总线上派遣事件。
我们还将dynamodb项目映射到Reservation
对象,以简化其他lambda函数的代码。
第四个lambda功能:onRestaurantBooked
import { SESv2Client, SendEmailCommand } from '@aws-sdk/client-sesv2';
import { Reservation } from '../types';
const client = new SESv2Client({});
const RESTAURANT_OWNER_EMAIL_ADDRESS = 'pierrech@theodo.fr';
export const handler = async ({ detail }: { detail: Reservation }): Promise<void> => {
const templateName = process.env.TEMPLATE_NAME;
const apiURL = process.env.API_URL;
const fromEmailAddress = process.env.FROM_EMAIL_ADDRESS;
if (templateName === undefined || apiURL === undefined || fromEmailAddress === undefined) {
throw new Error('TEMPLATE_NAME, API_URL and FROM_EMAIL_ADDRESS environment variables must be defined');
}
await client.send(
new SendEmailCommand({
FromEmailAddress: fromEmailAddress,
Destination: {
ToAddresses: [RESTAURANT_OWNER_EMAIL_ADDRESS],
},
Content: {
Template: {
TemplateName: templateName,
TemplateData: JSON.stringify({
firstName: detail.firstName,
lastName: detail.lastName,
dateTime: detail.dateTime,
partySize: detail.partySize,
apiURL,
reservationId: detail.id,
}),
},
},
}),
);
};
此Lambda对EventBridge事件做出反应。它可以访问事件的细节,即Reservation
对象。它使用此数据来补充SES模板并将电子邮件发送给餐厅所有者。它还使用API_URL环境变量来创建链接以确认预订。 (链接的创建是在模板内完成的,检查here)
请注意,电子邮件发送到固定的电子邮件地址。在真实的应用程序中,您可能希望将餐厅的电子邮件地址存储在其他地方,然后在此处检索。
第五lambda功能:onReservationConfirmed
import { SESv2Client, SendEmailCommand } from '@aws-sdk/client-sesv2';
import { Reservation } from '../types';
const client = new SESv2Client({});
export const handler = async ({ detail }: { detail: Reservation }): Promise<void> => {
const templateName = process.env.TEMPLATE_NAME;
const fromEmailAddress = process.env.FROM_EMAIL_ADDRESS;
if (templateName === undefined || fromEmailAddress === undefined) {
throw new Error('TEMPLATE_NAME and FROM_EMAIL_ADDRESS environment variables must be defined');
}
await client.send(
new SendEmailCommand({
FromEmailAddress: fromEmailAddress,
Destination: {
ToAddresses: [detail.email],
},
Content: {
Template: {
TemplateName: templateName,
TemplateData: JSON.stringify({
firstName: detail.firstName,
lastName: detail.lastName,
dateTime: detail.dateTime,
partySize: detail.partySize,
}),
},
},
}),
);
};
基本上是同一件事,除了电子邮件模板更简单(无链接)。我们将电子邮件发送给客户。
就是这样!
测试应用程序
要测试应用程序,我们只需要执行Postman的单个API调用即可。让我们创建一个预订:
几秒钟后,我收到了一封电子邮件(作为餐馆老板):
当我单击链接时,预订会得到确认。我收到了确认电子邮件(作为客户):
那样简单!该应用程序非常可扩展,因为它是事件驱动的:我们可以在确认预订时添加外部API调用,发送收据,流程付款等...而无需修改应用程序的基础体系结构。
结论
这是一篇很长的文章,但我希望您学到很多东西! DynamoDB流是AWS的功能非常强大的功能,下一步朝这个方向进行了使用,就是使用它们来创建事件传输应用程序。再一次,我建议您检查this article,如果您想了解更多有关此主题的信息。
我计划每两月继续这一系列文章。我已经涵盖了简单的lambda函数和REST API的创建,并与DynamoDB数据库和S3存储桶进行了交互。您可以在我的repository上遵循此进度!我将介绍新主题,例如前端部署,类型安全,更高级的模式等...如果您有任何建议,请随时与我联系!
,如果您可以与您的朋友和同事分享这篇文章,我真的很感激。这将有助于我发展听众。另外,不要忘记在下一篇文章发行时订阅要更新!
我想在这里保持联系是我的twitter account。我经常发布或重新发布有关AWS和无服务器的有趣内容,请随时关注我!