使用SQS中的Lambda中的背景过程
#go #lamda #sqs

AWS Lambda

AWS Lambda是一款无服务器,事件驱动的计算服务,可让您在不配置或管理服务器的情况下为任何类型的应用程序或后端服务运行代码。您可以从200多个AWS服务和软件作为服务(SaaS)应用程序中触发Lambda,而仅支付您使用的费用。

问题

,即使Lambda函数返回,我们也希望在Lambda中(异步)在lambda中进行一些工作。但是,当lambda函数返回时,所有背景服务都会自动停止。

解决方案

因此,在这里我们将使用Amazon SQS进行此目的。我们将向队列发送消息,它将触发另一个将在后台运行的lambda函数。

Image description

什么是亚马逊SQS?

Amazon Simple Queue Service (SQS)是一项托管消息排队服务,使您能够解除和缩放微服务,分布式系统和无服务器应用程序。使用SQS,您可以在任何卷中在软件组件之间异步发送,存储和接收消息,而无需丢失消息或需要其他服务可用。

SQS提供两种类型的消息队列。 Standard queues提供最大的吞吐量,最佳订购和至上的once交付。 SQS FIFO queues旨在确保按确切的顺序进行确切处理一次消息。

Golang示例使用SQS在lambda中运行背景过程

先决条件

  • 用于构建和部署您的功能,您将使用Serverless Framework,这是最广泛使用的工具。假设您已安装了最近的Node.js版本,则可以使用以下NPM命令安装Serverless CLI
  $ npm install -g serverless

安装了无服务器CLI后,必须将其配置为使用帐户的AWS访问键

  $ serverless config credentials --provider aws --key <access key ID> --secret <secret access key>

您可以从My Security Credentials选项中获取访问密钥和秘密键。如果您还没有

,请使用Create New Access Key

Image description

  • 如果您还没有安装,则可以从官方网站上进行download an installer或使用您喜欢的软件包管理器安装

创建消息队列

让我们创建一个队列。转到AWS帐户中的简单队列服务(SQS)。您将在那里看到以下接口

Image description

从这里创建队列,然后选择“ type FIFO”,让我们将其命名为test.fifo,因为我们要按照它们发送的确切顺序来精确处理一次消息。您也可以根据需要使用standard queue。我们将暂时保留所有其他设置。

Image description

Image description

现在让我们代码。

在SQS队列上发送消息

现在您拥有所需的一切,让我们安装AWS SDK for Go库。

$ go get github.com/aws/aws-sdk-go

之后,我们可以继续编写代码以在我们的SQS队列上发送消息,以触发将执行背景任务的lambda函数。

package handlers

import (
  "github.com/aws/aws-sdk-go/aws"
  "github.com/aws/aws-sdk-go/aws/credentials"
  "github.com/aws/aws-sdk-go/aws/session"
  "github.com/aws/aws-sdk-go/service/sqs"
  "os"

  "encoding/json"
  "log"
  "net/http"
)


func yourHttpHandlerFunction(w http.ResponseWrite, r *http.Request) {
  // tasks you need to perform before the background task
  ...

  // sending details to other lambda function to perform the background task
  message := SqsTriggerMessage{
    Message: "You can add different fields here according to your data requirements in the background task.",
  }

  err := sendMessage(message)
  if err != nil {
    log.Println(err)
    w.WriteHeader(http.StatusInternalServerError)
    return
  }

  // other tasks you need to perform
  ...

  // sending user an immediate response
  w.WriteHeader(http.StatusOK)
}

type SqsTriggerMessage struct {
  Message string
}

func sendMessage(data interface{}) error {
  b, err := json.Marshal(data)
  if err != nil {
    return err
  }

  svc := sqs.New(
    session.Must(
      session.NewSession(
        &aws.Config{
          Credentials: credentials.NewStaticCredentials(os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY"), ""),
          Region:      aws.String(os.Getenv("AWS_REGION")),
        },
      ),
    ),
  )

  result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
    QueueName: aws.String("queue-name"),
  })
  if err != nil {
    return err
  }

  _, err = svc.SendMessage(&sqs.SendMessageInput{
    MessageBody:            aws.String(string(b)),
    QueueUrl:               result.QueueUrl,
    MessageGroupId:         aws.String("group-id"),
    MessageDeduplicationId: aws.String("deduplication-id"),
  })

  return err
}

以前的代码可以分为几个简单的步骤:

  • 定义包含您在后台任务中需要的数据的消息结构
  • 您的结构将您的结构纳入JSON,并将该JSON作为消息发送到SQS队列 我们还可以通过消息属性发送此方法,但是我发现此方法更方便地使用
  • 给用户一个200响应并返回。在此之后,您的lambda功能将完成,但是您在SQS队列上发送的消息将触发另一个Lambda功能以执行背景任务

lambda功能处理SQS事件

让我们创建另一个程序来处理SQS事件。

package main

import (
  "github.com/aws/aws-lambda-go/events"
  "github.com/aws/aws-lambda-go/lambda"

  "encoding/json"
  "log"
)

func main() {
  lambda.Start(handleSqsRequest)
}

func handleSqsRequest(sqsEvent events.SQSEvent) error {
  for _, message := range sqsEvent.Records {
    var request SqsTriggerMessage
    err := json.Unmarshal([]byte(message.Body),&request)
    if err!=nil {
      log.Println(err)
      continue
    }
    // task you need to perform based on the data in the message
    log.Println(request.Message)
    ...
  }

  return nil
}

type SqsTriggerMessage struct {
  Message string
}

以前的代码可以分为几个简单的步骤:

  • 我们编写了一个handleSqsRequest功能,该功能将接收队列中的消息并使用AWS Lambda for Go library在主函数中注册
  • 在事件处理程序函数中,我们将消息主体分解为我们的消息结构,并在控制台上打印消息。您可以发送执行和使用这些任务的任务详细信息来调用适当的功能来完成该任务

现在,我们有2个lambda函数第一个函数将消息发送到SQS队列,需要触发我们的第二个lambda函数以执行背景任务

部署

我们的lambda功能现在已经准备就绪,我们可以通过使用无服务器框架部署它来进行。我们的应用程序由基于serverless.yml配置文件的无服务器框架部署。

如果您不熟悉.yml语法,则可以阅读此serverless.yml guide

我们必须首先创建一个定义我们部署的serverless.yml文件。

service: your-service-name
provider:
  name: aws
  runtime: go1.x
package:
  exclude:
    - ./**
  include:
    - ./bin/**
functions:
  main-program:
    handler: bin/main-program
    events:
      - http:
          path: /
          method: get
      ...
  sqs-handler:
    handler: bin/sqs-handler
    events:
      - sqs: <replace_this_with_your_sqs_queue_arn>

在上面的文件中,我们定义了两个lambda函数。

  • 第一个通过Amazon API Gateway在HTTP请求上运行主要程序。在此程序中,我们需要执行背景任务,因此我们将在SQS队列上发送一条消息
  • 第二个运行我们的SQS处理程序,在我们在这里提供的arn的队列中的SQS事件。您可以从您创建的AWS帐户中获取arnAmazon SQS > Queues > test.fifo

Image description

接下来,我们将构建代码,并使用serverless deploy命令进行部署。

GOOS=linux GOARCH=amd64 go build -o bin/main-program .
GOOS=linux GOARCH=amd64 go build -o bin/sqs-handler ./sqs

serverless deploy

您可以通过CloudWatch看到所有lambda函数的日志。要访问日志,请转到Amazon帐户中的CloudWatch > Log groups

Image description

您可以从这里看到日志。让我们检查sqs-handler日志。它一定已经打印了收到的消息。

Image description

参考

this github repository获取完整的源代码。