使用Apache Kafka将数据摄入opensearch并进行
#教程 #database #go #分析

有时您可能需要编写自定义集成层以满足数据管道中的特定要求。使用go

了解如何使用kafka和opensearch做到这一点

可扩展数据摄入是大规模分布式搜索和分析引擎(如OpenSearch)的关键方面。构建实时数据摄入管道的方法之一是使用Apache Kafka。这是一个开源事件流平台,用于处理高数据量(和速度),并与包括关系和NOSQL数据库在内的各种来源集成。例如,规范用例之一是在异质系统(源组件)之间进行数据实时同步,以确保OpenSearch索引是新鲜的,可以用于分析或通过仪表板和可视化的下游应用程序。

>

此博客文章将介绍如何创建数据管道,其中写入Apache Kafka的数据被摄入OpenSearch。我们将使用Amazon OpenSearch ServerlessAmazon Managed Streaming for Apache Kafka (Amazon MSK) ServerlessKafka Connect非常适合此类要求。它为OpenSearch和Elasticsearch提供了接收器连接器(如果您选择使用Amazon OpenSearch的Elasticsearch OSS引擎,则可以使用。但是,有时候,有一些特定的要求或原因可能需要使用自定义解决方案。

例如,您可能正在使用一个数据源,而Kafka Connect不支持的数据源(罕见,但可能发生),并且不想从头开始写一个。或者,这可能是一次性集成,您想知道是否值得努力设置和配置Kafka Connect。也许还有其他问题,例如许可等。

值得庆幸的是,Kafka和OpenSearch为客户库提供了各种编程语言的客户库,这些语言使您可以编写自己的集成层。这正是此博客所涵盖的内容!我们将使用自定义的Go应用程序使用go KafkaOpenSearch摄入数据。

您将学习:

  • 概述如何设置所需的AWS服务 - OpenSearch serverless,MSK无服务器,AWS Cloud9以及IAM策略和安全配置。
  • 高级步行遍历应用程序。
  • 将数据摄入管道启动并运行。
  • 如何在OpenSearch中查询数据

在我们进入Nitty-Gritty之前,这是OpenSearch serverless和Amazon MSK无服务器的快速概述。

Amazon OpenSearch无服务器和Amazon MSK无服务器的简介

OpenSearch是一种开源搜索和分析引擎,用于日志分析,实时监视和ClickStream分析。 Amazon OpenSearch服务是一项托管服务,简化了AWS中OpenSearch簇的部署和缩放。

Amazon OpenSearch Service支持OpenSearch和Legacy Elasticsearch OSS(该软件的最终开源版本)。创建集群时,您可以选择使用哪种搜索引擎。

您可以create an OpenSearch Service domain(与OpenSearch cluster的同义词)代表集群,每个Amazon EC2实例充当 node 。但是,OpenSearch无服务器通过为OpenSearch服务提供无需服务器的配置来消除操作复杂性。它使用索引的集合来支持特定的工作负载,与传统簇不同,它将索引和搜索组件分开,而Amazon S3作为索引的主要存储空间。该体系结构实现了搜索和索引功能的独立缩放。

您可以参考Comparing OpenSearch Service and OpenSearch Serverless中的详细信息。

OpenSearch Serverless

Amazon MSK(Apache Kafka的托管流媒体)是一款用于使用Apache Kafka处理流数据的完全管理服务。它处理群集管理操作,例如创建,更新和删除。您可以使用标准的Apache Kafka数据操作来生产和消费数据,而无需修改您的应用程序。它支持开源Kafka版本,确保与现有工具,插件和应用程序的兼容性。

MSK Serverless architecture

MSK Serverless是Amazon MSK中的群集类型,它消除了对群集容量的手动管理和缩放的需求。它会根据需求自动规定和扩展资源,以照顾主题分区管理。使用付费的定价模型,您只需支付实际使用费。 MSK无服务器是需要灵活和自动缩放流能力的应用程序的理想选择。

让我们首先讨论高级应用程序体系结构,然后再进行体系结构注意事项。

应用概述和关键体系结构注意事项

这是应用程序体系结构的简化版本,概述了组件以及它们如何相互作用。

Image description

该应用程序由生产者和消费者组件组成,该应用程序已部署到EC2实例:

  • 顾名思义,生产者将数据发送到MSK无服务器集群。
  • 消费者应用程序从MSK无服务器主题接收数据(movie信息),并使用OpenSearch GO客户端在movies集合中索引数据。

专注于简单性

值得注意的是,博客文章已被优化,以简单性和易于理解,因此,该解决方案并未用于运行生产工作负载。以下是已经进行的一些简化:

  • 生产者和消费者应用程序在相同计算平台(EC2实例)上运行。
  • 有来自MSK主题的单一消费者应用程序实例处理数据。但是,您可以尝试运行消费者应用程序的多个实例,并查看如何在整个实例中分发数据。
  • 而不是使用Kafka CLI生成数据,而是编写了一个自定义生产者应用程序,并与REST端点一起发送数据。这说明了如何在GO和模仿Kafka Cli中编写Kafka生产者应用程序。
  • 所使用的数据量很小。
  • OpenSearch serverless Collection具有 public 访问类型。

对于生产工作量,您应该考虑以下一些事情:

  • 根据数据量和可伸缩性要求为您的消费者应用程序选择适当的计算平台 -
  • 选择 vpc 访问您的OpenSearch serverless Collection
  • 考虑使用Amazon OpenSearch Ingestion创建数据管道。

如果您仍然需要部署自定义应用程序来构建从MSK到OpenSearch的数据管道,则可以选择以下计算选项范围:

  • 容器 - 您可以将消费者应用程序包装为Docker容器(在GitHub存储库中可用)并将其部署到Amazon EKSAmazon ECS
  • 如果您将应用程序部署到Amazon EKS,也可以考虑使用MSK主题中的消息数来自动使用KEDA自动尺度。
  • 无服务器 - 也可以使用MSK as an event source for AWS Lambda functions。您可以将消费者应用程序编写为lambda函数,并将其配置为由MSK事件触发或在AWS Fargate上运行。
  • 由于生产者应用程序是REST API,因此您可以将其部署到AWS App Runner。
  • 最后,您可以利用Amazon EC2 Auto Scaling groups为您的消费者应用程序自动尺度EC2机队。

有足够的材料谈论如何使用基于Java的Kafka应用程序与MSK Serverless using IAM连接。
让我们简短地绕道而行,了解与GO一起工作的方式。

GO Client应用程序如何使用IAM使用MSK无服务器进行身份验证?

MSK无服务器需要IAM访问控制,以处理MSK群集的身份验证和授权。这意味着您的MSK客户端应用程序(在这种情况下为生产者和消费者)必须使用IAM对MSK进行身份验证,以此为基础,它们将被允许或拒绝特定的Apache Kafka操作。

好处是franz-go KAFKA客户端库支持IAM身份验证。以下是消费者应用程序中的片段,显示了它在实践中的工作方式:

func init() {
//......
    cfg, err = config.LoadDefaultConfig(context.Background(), config.WithRegion("us-east-1"), config.WithCredentialsProvider(ec2rolecreds.New()))

  creds, err = cfg.Credentials.Retrieve(context.Background())
//....

func initializeKafkaClient() {

    opts := []kgo.Opt{
        kgo.SeedBrokers(strings.Split(mskBroker, ",")...),
        kgo.SASL(sasl_aws.ManagedStreamingIAM(func(ctx context.Context) (sasl_aws.Auth, error) {

            return sasl_aws.Auth{
                AccessKey:    creds.AccessKeyID,
                SecretKey:    creds.SecretAccessKey,
                SessionToken: creds.SessionToken,
                UserAgent:    "msk-ec2-consumer-app",
            }, nil
        })),
//.....
  • 首先,该应用程序使用ec2rolecreds.New()凭据提供商从EC2实例元数据服务中检索临时IAM凭据。 EC2实例角色应具有适当的IAM角色(具有权限),以执行MSK群集组件所需的操作(在后续部分中有关此操作)。
  • 然后,这些凭据用于使用sasl_aws软件包中的AWS_MSK_IAM SASL身份验证实现来初始化KAFKA客户端。

注意:由于Kafka有多个GO客户端(包括Sarama),请确保咨询其客户文档以确认他们是否支持IAM身份验证。

好的,在这种背景下,让我们设置运行摄入管道所需的服务。

基础架构设置

本节将帮助您设置以下组件:

  • 必需的IAM角色
  • MSK无服务器集群
  • OpenSearch无服务器集合
  • AWS Cloud9 EC2环境运行您的应用程序

MSK无服务器集群

您可以关注this documentation使用AWS控制台设置MSK无服务器集群。执行此操作后,请注意以下群集信息 - VPC,子网,安全组( properties tab)和群集端点(单击查看客户端信息)。

Image description

应用程序角色

本教程需要不同的IAM角色。

首先创建一个IAM角色以执行后续步骤并使用Step 1: Configure permissions的权限(在Amazon OpenSearch文档中)使用OpenSearch无服务器。

为客户端应用程序创建另一个IAM角色,该应用程序将与MSK无服务器群集交互,并使用OpenSearch GO Client在OpenSearch无用的Collection中索引数据。在下面创建一个内联IAM策略 - 确保替换所需的值。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*"
            ],
            "Resource": [
                "<ARN of the MSK Serverless cluster>",
                "arn:aws:kafka:us-east-1:<AWS_ACCOUNT_ID>:topic/<MSK_CLUSTER_NAME>/*",
                "arn:aws:kafka:us-east-1:AWS_ACCOUNT_ID:group/<MSK_CLUSTER_NAME>/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "aoss:APIAccessAll"
            ],
            "Resource": "*"
        }
    ]
}

使用以下信任策略:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "ec2.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

最后,您将在下一步中附加opensearch Data access policies的另一个IAM角色。

OpenSearch无服务器集合

使用the documentation创建一个OpenSearch搜索服务器集合。在Step 2: Create a collection中遵循点8 时,请确保配置两个数据策略,即在上一节中的步骤2和第3步中创建的每个IAM角色。

请注意,出于本教程的目的,我们选择了 public 访问类型。建议选择 VPC 用于生产工作负载。

Image description

AWS Cloud9 EC2环境

使用this documentation创建AWS Cloud9 EC2开发环境 - 确保使用相同 VPC与MSK无服务器群集。

完成后,您需要执行以下操作:在 EC2实例下打开Cloud9环境,单击管理EC2实例。在EC2实例中,导航到安全性并记下所附的安全组。

Image description

打开与MSK无服务器群集关联的安全组,并添加一个入站规则以允许Cloud9 EC2实例连接到它。选择Cloud9 EC2实例的安全组作为源, 9098 作为端口和 TCP 协议。

Image description

您现在准备运行该应用程序了!

选择Cloud9环境,然后选择在Cloud9 中打开以启动IDE。打开终端窗口,克隆github存储库,然后将目录更改为文件夹。

git clone https://github.com/build-on-aws/opensearch-using-kafka-golang

cd opensearch-using-kafka-golang

启动生产者应用程序:

cd msk-producer

export MSK_BROKER=<enter MSK Serverless cluster endpoint>
export MSK_TOPIC=movies

go run main.go

您应该在终端中看到以下日志:

MSK_BROKER <MSK Serverless cluster endpoint>
MSK_TOPIC movies
starting producer app
http server ready

要将数据发送到MSK无服务器群集,请使用一个bash脚本,该脚本将调用您刚刚启动的应用程序所示的HTTP端点,并使用JSON使用curl
JSON的形式提交电影数据(从movies.txt file)提交。

./send-data.sh

在生产者应用程序终端日志中,您应该看到与以下相似的输出:

producing data to topic
payload {"directors": ["Joseph Gordon-Levitt"], "release_date": "2013-01-18T00:00:00Z", "rating": 7.4, "genres": ["Comedy", "Drama"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQxNTc3NDM2MF5BMl5BanBnXkFtZTcwNzQ5NTQ3OQ@@._V1_SX400_.jpg", "plot": "A New Jersey guy dedicated to his family, friends, and church, develops unrealistic expectations from watching porn and works to find happiness and intimacy with his potential true love.", "title": "Don Jon", "rank": 1, "running_time_secs": 5400, "actors": ["Joseph Gordon-Levitt", "Scarlett Johansson", "Julianne Moore"], "year": 2013}
record produced successfully to offset 2 in partition 0 of topic movies

producing data to topic
payload {"directors": ["Ron Howard"], "release_date": "2013-09-02T00:00:00Z", "rating": 8.3, "genres": ["Action", "Biography", "Drama", "Sport"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg", "plot": "A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.", "title": "Rush", "rank": 2, "running_time_secs": 7380, "actors": ["Daniel Br\u00c3\u00bchl", "Chris Hemsworth", "Olivia Wilde"], "year": 2013}
record produced successfully to offset 4 in partition 1 of topic movies

.....

为了本教程的目的,为了使其简单易于遵循,数据的数量已故意仅限于1500个记录,并且在将每个记录发送给生产者之后,脚本有意睡1秒钟。您应该能够舒适地跟随。

当生产者应用程序忙于将数据发送到movies主题时,您可以启动消费者应用程序从MSK无服务器群集开始处理数据,并将其索引到OpenSearch Serverless Collection中。

cd msk-consumer

export MSK_BROKER=<enter MSK Serverless cluster endpoint>
export MSK_TOPIC=movies
export OPENSEARCH_INDEX_NAME=movies-index
export OPENSEARCH_ENDPOINT_URL=<enter OpenSearch Serverless endpoint>

go run main.go

您应该在终端中看到以下输出,这表明它确实已经开始从MSK无服务器集群中接收数据并在OpenSearch搜索服务器集合中进行索引。

using default value for AWS_REGION - us-east-1
MSK_BROKER <MSK Serverless cluster endpoint>
MSK_TOPIC movies
OPENSEARCH_INDEX_NAME movies-index
OPENSEARCH_ENDPOINT_URL <OpenSearch Serverless endpoint>
using credentials from: EC2RoleProvider
kafka consumer goroutine started. waiting for records
paritions ASSIGNED for topic movies [0 1 2]

got record from partition 1 key= val={"directors": ["Joseph Gordon-Levitt"], "release_date": "2013-01-18T00:00:00Z", "rating": 7.4, "genres": ["Comedy", "Drama"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQxNTc3NDM2MF5BMl5BanBnXkFtZTcwNzQ5NTQ3OQ@@._V1_SX400_.jpg", "plot": "A New Jersey guy dedicated to his family, friends, and church, develops unrealistic expectations from watching porn and works to find happiness and intimacy with his potential true love.", "title": "Don Jon", "rank": 1, "running_time_secs": 5400, "actors": ["Joseph Gordon-Levitt", "Scarlett Johansson", "Julianne Moore"], "year": 2013}
movie data indexed
committing offsets
got record from partition 2 key= val={"directors": ["Ron Howard"], "release_date": "2013-09-02T00:00:00Z", "rating": 8.3, "genres": ["Action", "Biography", "Drama", "Sport"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg", "plot": "A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.", "title": "Rush", "rank": 2, "running_time_secs": 7380, "actors": ["Daniel Br\u00c3\u00bchl", "Chris Hemsworth", "Olivia Wilde"], "year": 2013}
movie data indexed
committing offsets

.....

过程完成后,您应该将1500电影索引在OpenSearch无服务器集合中。不过,您不必等待它完成。一旦有几百个记录,您就可以继续导航到Dev Tools in the OpenSearch dashboard执行以下查询。

查询电影数据在OpenSearch中

运行简单查询

让我们从列表 all 的简单查询开始(无需任何参数或过滤器)。

GET movies-index/_search

Image description

仅获取特定字段的数据

默认情况下,搜索请求会检索索引文档时提供的整个JSON对象。使用_source选项从选定字段中检索源。例如,仅检索titleplotgenres字段,请运行以下查询:

GET movies-index/_search
{
  "_source": {
  "includes": [
    "title",
    "plot",
    "genres"
    ]
  }
}

Image description

获取数据以匹配确切的搜索术语一个术语查询

您可以使用术语查询来实现此目的。例如,要在title字段中搜索具有christmas一词的电影,请运行以下查询:

GET movies-index/_search
{
  "query": {
    "term": { 
      "title": {
        "value": "christmas"
      }
    }
  }
}

Image description

**结合选择性字段选择与术语查询

您可以使用此查询仅检索某些字段,但对特定术语感兴趣:

GET movies-index/_search
{
  "_source": {
    "includes": [
      "title",
      "actors"
    ]
  },
  "query": {
    "query_string": {
      "default_field": "title",
      "query": "harry"
    }
  }
}

Image description

聚合

使用聚合根据特定字段中值的分组计算摘要值。例如,您可以总结ratingsgenreyear等字段,以根据这些字段的值搜索结果。通过汇总,我们可以回答诸如每种类型中有几部电影的问题?

GET movies-index/_search
{
  "size":0,
  "aggs": {
    "genres": {
      "terms":{"field": "genres.keyword"}
    }
  }
}

清理

完成演示后,请确保删除所有服务,以免产生任何其他费用。您可以按照各自的文档中的步骤删除服务。

结论

回顾一下,您将管道部署以使用KAFKA将数据摄入无服务器,然后以不同的方式进行查询。在此过程中,您还了解了架构的注意事项和计算选项,以牢记生产工作负载以及使用MSK IAM身份验证的基于GO的KAFKA应用程序。我还建议阅读文章Building a CRUD Application in Go for Amazon OpenSearch,特别是如果您正在寻找以Go SDK进行OpenSearch操作的教程。

这很漫长(我认为!)。谢谢您阅读直到最后!如果您喜欢本教程,找到任何问题或对我们有反馈,please send it our way!