使用SQS将油门写入DynamoDB
#aws #云 #database #dynamodb

我们正在运行一个电子商务平台,人们在该平台上发布产品和其他人购买这些产品。我们的后端在well-designed Lambdas上运行了一些高度可扩展的microservices,并且涉及很多缓存。我们的订单处理微服务写入了我们在How to Design a DynamoDB Database之后设置的DynamoDB表。我们正在使用DynamoDB provisioned capacity mode with auto scaling。我们做得很好,一切顺利。

突然,某人的产品传播开来,很多人同时急于购买它。我们的缓存和CDN甚至都不会眨眼到流量,我们精心设计的Lambdas比例非常快,但是我们的DynamoDB桌子突然被写入轰炸,并且自动缩放无法跟上。我们的订单处理lambda会收到ProvisionEdthroughPuteXceededException,当它重试时,一切都会使一切变得更糟。事情崩溃了。销售损失。我们最终康复,但这些客户消失了。我们如何确保它不会再次发生?

选项1是将DynamoDB表更改为按需,在缩放时可以跟上Lambda,但昂贵的5倍以上。选项2是要确保表的写入能力不超过。让我们探索选项2。

AWS服务涉及:

  • dynamodb:我们的数据库。这篇文章中您需要知道的只是how DynamoDB scales

  • sqs:一个完全托管的消息排队服务,使您能够将组件分解。制作人喜欢我们的订单处理微服务帖子到队列中,队列将这些消息存储在阅读之前,消费者在自己的时间内从队列中阅读。

  • ses:一个电子邮件平台,与MailChimp这样的服务更相似,而不是在AWS服务上。如果您已经在AWS上,并且只需要以编程方式发送电子邮件,那么设置就很容易。如果您不在AWS上,需要更多的控制权,或者需要发送如此多的电子邮件以至于价格是一个因素,则需要进行一些研究。对于这篇文章,SES足够好。

什么是亚马逊SQS

sqs是一个完全管理的消息排队服务。消息队列是一个数据结构,可以在其中以与书写相同的顺序读取项目:首先,第一(FIFO)。

队列使我们能够通过使消费者(这是从队列中读取的组件)不知道谁写的项目(作者称为生产者)来解除组件。此外,在软件体系结构中,我们通常专注于排队的另一个特征:撰写后的某个时间可能会发生。这使我们能够及时地将生产者和消费者解散:当生产商写信给队列时,消费者不需要可用。队列将消息存储一定时间,当消费者准备就绪时,他们 poll seagess的队列,并接收最古老的消息。

为了解决我们的解决方案,我们将使用队列,以便我们的订单处理微服务可以通过订单发送消息,队列存储消息,消费者可以自己的节奏阅读它(即在我们的dynamodb处桌子的节奏)。

SQS队列的类型

SQS中有两种类型的队列:

  • 标准队列是队列的默认类型。它们比FIFO队列便宜,并且几乎可以扩展。权衡是,他们只保证至上界的交付(这意味着您可能会得到重复),并且消息的顺序大多受到尊重但不能保证。

  • fifo 队列比标准队列更昂贵,而且它们不会无限缩放,但它们保证有序,恰好是一开始交付。您需要在消息中设置MessageGroupId属性,因为FIFO队列在成功处理上一个消息后仅在MessageGroup中传递下一个消息。例如,如果您将MessageGroupId的值设置为客户ID,并且客户同时下达两个订单,则第二次订单直到第一个处理完成后才处理。设置MessageDeduplicationId也很重要,以确保该消息在上游重复,将在队列中重复重复。 FIFO队列只能保留一个MessedUplicationID唯一值的消息。

大多数想到队列的人都在想保证FIFO订单和确切的交付。实际获得这些保证的唯一方法是使用FIFO队列。

如何为DynamoDB实现SQS队列

Diagram of a request before and after implementing the solution

按照以下步骤指令执行SQS队列以将其写入DynamoDB表。用适合您的帐户和区域的适当值替换YOUR_ACCOUNT_IDYOUR_REGION

创建订单队列

  1. 转到SQS控制台。

  2. 单击“创建队列”

  3. 选择“ FIFO”队列类型(不是默认标准)

  4. 在“队列名称”字段中输入“ ordersqueue”

  5. 将其余的作为默认值

  6. 单击“创建队列”

更新订单服务以写入SQS队列

我们需要更新订单服务的代码,以便将新订单发送到订单队列,而不是写入订单表。这就是代码的样子:

const AWS = require('aws-sdk');
const sqs = new AWS.SQS();
const queueUrl = 'https://sqs.YOUR_REGION.amazonaws.com/YOUR_ACCOUNT_ID/OrdersQueue';

async function processOrder(order) {
  const params = {
    MessageBody: JSON.stringify(order),
    QueueUrl: queueUrl,
    MessageGroupId: order.customerId,
    MessageDeduplicationId: order.id
  };

  try {
    const result = await sqs.sendMessage(params).promise();
    console.log('Order sent to SQS:', result.MessageId);
  } catch (error) {
    console.error('Error sending order to SQS:', error);
  }
}

另外,将此策略添加到功能的IAM角色中,以便它可以访问SQ。不要忘记删除访问dynamoDB的权限!

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:YOUR_REGION:YOUR_ACCOUNT_ID:OrdersQueue"
    }
  ]
}

设置SES通过电子邮件通知客户

  1. 打开SES Console

  2. 单击左导航窗格中的“域”

  3. 单击“验证新域”

  4. 按照屏幕上的说明添加所需的DNS记录。

  5. 另外,单击“电子邮件地址”,然后单击“验证新的电子邮件地址”按钮。输入您要验证的电子邮件地址,然后单击“验证此电子邮件地址”。检查您的收件箱并单击链接。

设置订单处理服务

转到Lambda控制台并创建一个新的Lambda功能。添加以下代码:

const AWS = require('aws-sdk');
const dynamoDB = new AWS.DynamoDB.DocumentClient();
const ses = new AWS.SES();

exports.handler = async (event) => {
    for (const record of event.Records) {
        const order = JSON.parse(record.body);
        await saveOrderToDynamoDB(order);
        await sendEmailNotification(order);
    }
};

async function saveOrderToDynamoDB(order) {
    const params = {
        TableName: 'orders',
        Item: order
    };

    try {
        await dynamoDB.put(params).promise();
        console.log(`Order saved: ${order.orderId}`);
    } catch (error) {
        console.error(`Error saving order: ${order.orderId}`, error);
    }
}

async function sendEmailNotification(order) {
    const emailParams = {
        Source: 'you@simpleaws.dev',
        Destination: {
            ToAddresses: [order.customerEmail]
        },
        Message: {
            Subject: {
                Data: 'Your order is ready'
            },
            Body: {
                Text: {
                    Data: `Thank you for your order, ${order.customerName}! Your order #${order.orderId} is now ready.`
                }
            }
        }
    };

    try {
        await ses.sendEmail(emailParams).promise();
        console.log(`Email sent: ${order.orderId}`);
    } catch (error) {
        console.error(`Error sending email for order: ${order.orderId}`, error);
    }
}

另外,将以下IAM策略添加到函数的IAM角色中,因此可以由SQS和Access DynamoDB和SES触发:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes"
      ],
      "Resource": "arn:aws:sqs:YOUR_REGION:YOUR_ACCOUNT_ID:OrdersQueue"
    },
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem"
      ],
      "Resource": "arn:aws:dynamodb:YOUR_REGION:YOUR_ACCOUNT_ID:table/Orders"
    },
    {
      "Effect": "Allow",
      "Action": "ses:SendEmail",
      "Resource": "*"
    }
  ]
}

使订单队列触发订单处理服务

  1. 在Lambda控制台中,转到订单处理lambda

  2. 在“函数概述”部分中,单击“添加触发器”

  3. 单击“选择触发器”,然后选择“ SQS”

  4. 选择订单队列

  5. 将批处理大小设置为1

  6. 确保检查“启用触发器”复选框

  7. 单击“添加”

限制订单处理lambda的同时执行

  1. 在Lambda控制台中,转到订单处理lambda

  2. 向下滚动到“并发”部分

  3. 单击“编辑”

  4. 在“配置并发”部分中,将“保留并发”设置为10

  5. 单击“保存”

与SQS同步和异步工作流程

架构上,解决方案有一个很大的变化:我们使我们的工作流程 async !让我把图带到这里。

Diagram of a request before and after implementing the solution

之前,我们的订单服务将返回订单的结果。从用户的角度来看,他们等到处理订单,他们在网站上看到了结果。从系统的角度来看,我们被禁止在API网关(29秒)的超时限制中成功或失败处理订单。用更实用的话来说,我们受用户期望的限制:我们不能仅显示29秒的“加载”图标!

更改后,网站只是显示了“我们正在处理您的订单,我们在准备就绪时给您发送电子邮件”之类的内容。这为用户设定了不同的期望。这对系统很重要,因为现在我们实际上可以使用Lambda功能需要15分钟,而不会达到API网关的29秒限制,也不会生气。不过,不仅是,如果订单处理lambda崩溃中期的崩溃,SQS队列将使订单在可见性超时到期后再次作为消息可用,而Lambda服务将通过相同的订单再次调用我们的功能。当达到MaxReceiveCount限制时,可以将订单发送到另一个称为“死信”队列(DLQ)的队列,我们​​可以在其中存储失败的订单以供将来参考。我们在这里没有设置DLQ,但是很容易,对于中小型系统,您可以轻松设置SNS来向您发送电子邮件并手动解决问题,因为该卷不应该特别大。

一旦订单通过了所有步骤,失败了一些,重述,成功等,我们就会通知用户他们的订单已“准备就绪”。对于不同的系统来说,这看起来可能不同,有些只是“我们得到的钱”,有些船体物理产品,有些是在用户上登上复杂的SaaS。对于此解决方案,我选择通过电子邮件执行此操作,因为它很容易且常见,但是您可以使用webhook并仍然保持异步。

SQS和DynamoDB的最佳实践

卓越运营

  • 监视并设置警报:您知道如何监视lambdas。您也可以监视SQS队列!在此处设置的一个有趣的警报将是队列中的订单数量,因此我们的客户不要等待太长时间才能处理他们的订单。

  • 处理错误并重试:准备好任何事情都能失败,并相应地建筑师。设置DLQ,设置通知(对您和用户),以便当事情失败时,最重要的是不会丢失/损坏数据。

  • 设置跟踪:我们会使事情变得复杂(希望有充分的理由)。我们可以通过setting up X-Ray获得更好的复杂性。

安全

  • 检查“启用服务器端加密”:这就是要在静止时加密的SQS队列所需要做的就是检查该框,然后选择KMS键。 SQS通过HTTPS进行通信,因此您已经在运输中进行了加密。

  • 加强权限:本期的IAM政策非常限制。但是总有一个螺母要拧紧,所以请睁开眼睛。

可靠性

  • 设置MaxReceiveCount和DQL:使用FIFO队列,下一个消息将在成功处理或删除之前,无法进行处理(如果是DLQ(如果是DLQ)您设置一个)MaxReceIveCount尝试。如果您不设置这些,则一个损坏的订单将阻止您的整个系统。

  • 设置可见度超时:这是SQ在不收到“成功”响应的情况下等待的时间,然后假设消息未能成功处理并再次为下一个消费者提供。设置一个合理的值,并为消费者设置与超时的值相同(在这种情况下订购lambda)。

性能效率

  • 优化lambda功能内存:更多内存意味着更多的钱。但这也意味着更快的处理。从30秒到25秒,对于成功处理的订单而言并不重要,但是如果订单被重述5次,现在我们要获得的25秒而不是5秒,这是值得的,这取决于您的客户期望。<<<<<<<<< /p>

  • 使用批处理处理:如前所述,您应该考虑在批处理中处理消息。

  • 记住 the 20 advanced tips for Lambda

成本优化

  • dynamodb的配置与按需:请记住,可以通过按需模式使用我们的dynamodb表来修复这一点。不过,它贵5倍。关系数据库也是如此(如果我们使用Aurora,则Aurora serverless是一个选项)。

  • 考虑Lambda以外的其他内容:在这种情况下,我们正在尝试使所有订单处理相对较快。如果处理可以等待更多,则根据SQS队列中的消息数量进行缩放的自动缩放组可以以更少的钱来构成奇迹。


停止复制云解决方案,开始理解它们。
加入3000多个开发人员,技术线索和专家,学习如何与Simple AWS newsletter一起构建云解决方案,而不是通过考试。

  • 真实场景和解决方案

  • 为什么在解决方案背后

  • 最佳实践改进它们

Subscribe for free

如果您想进一步了解我,可以找到我on LinkedInwww.guilleojeda.com