快速提示:使用Kafka和CDC从MongoDB Atlas到Singlestore Kai的流式传输数据
#singlestoredb #mongodb #kafka #cdc

抽象的

Singlestore提供了当前在预览中的更改数据捕获(CDC)解决方案,以将数据从MongoDB流到Singlestore Kai。在本文中,我们将查看如何将Apache Kafka代理连接到MongoDB Atlas,然后使用CDC解决方案将数据从MongoDB Atlas流到Singlestore Kai。我们还将使用metabase为单闸Kai创建一个简单的分析仪表板。

本文中使用的笔记本文件可在GitHub上找到。

介绍

CDC是一种跟踪数据库或系统中发生的更改的方法。 Singlestore现在提供了当前在预览中的CDC解决方案,可与MongoDB一起使用。

为了演示CDC解决方案,我们将使用KAFKA代理将数据传输到MongoDB Atlas群集,然后使用CDC管道将Mongodb Atlas的数​​据传播到Singlestore Kai。我们还将使用Metabase创建一个简单的分析仪表板。

图1显示了我们系统的高级体系结构。

Figure 1. High-Level Architecture (Source: SingleStore).

图1.高级体系结构(来源:Singlestore)。

我们将在未来文章中使用CDC解决方案重点关注其他方案。

Mongodb地图集

我们将在M0沙盒中使用MongoDB地图集。我们将在 atlasadmin 特权数据库访问下配置 admin 用户。我们将在网络访问下暂时允许从任何地方(IP地址0.0.0.0/0)访问。我们将记下用户名密码主机

Apache Kafka

我们将配置KAFKA代理将数据流到MongoDB地图集中。我们将使用Jupyter笔记本来实现这一目标。

首先,我们将安装一些库:

!pip install pymongo kafka-python --quiet

接下来,我们将连接到Mongodb Atlas和Kafka经纪人:

from kafka import KafkaConsumer
from pymongo import MongoClient

try:
    client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")
    db = client.adtech
    print("Connected successfully")
except:
    print("Could not connect")

consumer = KafkaConsumer(
    "ad_events",
    bootstrap_servers = ["public-kafka.memcompute.com:9092"]
)

我们将用MongoDB Atlas保存的值替换<username><password><host>

最初,我们将将100个记录加载到MongoDB地图集,如下:

MAX_ITERATIONS = 100

for iteration, message in enumerate(consumer, start = 1):
    if iteration > MAX_ITERATIONS:
        break

    try:
        record = message.value.decode("utf-8")
        user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t"))

        events_record = {
            "user_id": int(user_id),
            "event_name": event_name,
            "advertiser": advertiser,
            "campaign": int(campaign.split()[0]),
            "gender": gender,
            "income": income,
            "page_url": page_url,
            "region": region,
            "country": country
        }

        db.events.insert_one(events_record)
    except Exception as e:
        print(f"Iteration {iteration}: Could not insert data - {str(e)}")

数据应成功加载,我们应该看到一个名为adtech的数据库,其中包含一个名为events的集合。集合中的文档的结构应与以下示例相似:

_id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
user_id: 3857963415
event_name: "Impression"
advertiser: "Sherwin-Williams"
campaign: 13
gender: "Female"
income: "25k and below",
page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
region: "Michigan"
country: "US"

这些文档代表广告活动事件。 events Collection存储advertisercampaign的详细信息以及有关用户的各种人口统计信息,例如genderincome

Singlestore Kai

previous article展示了创建一个免费的SinglestoredB云帐户的步骤。我们将使用以下设置:

  • 工作区组名称: CDC演示组
  • 云提供商: aws
  • 地区:美国东1(N。Virginia)
  • 工作空间名称: cdc-demo
  • 大小: S-00
  • 设置:
    • SINGLESTORE KAI选择

可用工作空间后,我们将记下我们的密码主机。主机将从 CDC演示组>概述>工作区> CDC-DEMO> Connect> Connect> Connect>直接连接> SQL IDE>主机>主机。稍后将需要此信息以获取Metabase。我们还将通过在 CDC演示组>防火墙> firewall>。

从左导航窗格中,我们将选择开发> SQL Editor 创建adtech数据库和link,如下:

CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;

DROP LINK adtech.link;

CREATE LINK adtech.link AS MONGODB
CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017",
        "collection.include.list": "adtech.*",
        "mongodb.ssl.enabled": "true",
        "mongodb.authsource": "admin",
        "mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user": "<username>",
            "mongodb.password": "<password>"}';

CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;

我们将用Mongodb Atlas先前保存的值替换<username><password>。我们还需要用MongoDB Atlas的每个地址替换<primary><secondary><secondary>的值。

现在,我们将检查任何表格,如下:

SHOW TABLES;

这应该显示一张名为events的表:

+------------------+
| Tables_in_adtech |
+------------------+
| events           |
+------------------+

我们将检查表的结构:

DESCRIBE events;

输出应如下:

+-------+------+------+------+---------+-------+
| Field | Type | Null | Key  | Default | Extra |
+-------+------+------+------+---------+-------+
| _id   | text | NO   | UNI  | NULL    |       |
| _more | JSON | NO   |      | NULL    |       |
+-------+------+------+------+---------+-------+

接下来,我们将检查任何pipelines

SHOW PIPELINES;

这将显示一个称为events的管道,该管道当前是Stopped

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Stopped | False     |
+---------------------+---------+-----------+

现在我们将启动events管道:

START ALL PIPELINES;

,国家应更改为Running

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Running | False     |
+---------------------+---------+-----------+

如果我们现在运行以下命令:

SELECT COUNT(*) FROM events;

结果应返回100:

+----------+
| COUNT(*) |
+----------+
|      100 |
+----------+

我们将在事件表中检查一行,如下:

SELECT * FROM events LIMIT 1;

输出应类似于以下内容:

+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| _id                                  | _more                                                                                                                                                                                                                                                                   |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

CDC解决方案已成功连接到MongoDB地图集,并将所有100个记录复制到Singlestore Kai。

现在,我们使用metabase创建一个仪表板。

metabase

previous article中描述了如何安装,配置和创建与Metabase的连接的详细信息。我们将使用早期文章中使用的查询的微小变化来创建可视化。

1.事件总数

SELECT COUNT(*) FROM events;

2.按地区划分的事件

SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;

3.前5名广告商的活动

SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;

4.根据性别和收入的访问者

SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
        WHEN xx.z___min_rank = xx.z___rank THEN 1
        ELSE 0
      END AS z__is_highest_ranked_cell
    FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
        FROM (SELECT *, RANK() OVER (ORDER BY CASE
                WHEN bb.z__pivot_col_rank = 1 THEN (CASE
                    WHEN bb.`events.count` IS NOT NULL THEN 0
                    ELSE 1
                  END)
                ELSE 2
              END, CASE
                WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
                ELSE NULL
              END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
            FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
                    WHEN ww.`events.gender` IS NULL THEN 1
                    ELSE 0
                  END, ww.`events.gender`) AS z__pivot_col_rank
                FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`
                    FROM adtech.events AS events
                    WHERE (_more::income <> 'unknown' OR _more::income IS NULL)
                    GROUP BY 1, 2) ww) bb
            WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;

图2显示了图表大小并放在Adtech仪表板上的示例。我们将Auto-Refresh选项设置为1分钟。

Figure 2. Final Dashboard.

图2.最终仪表板。

如果我们通过更改MAX_ITERATIONS使用jupyter笔记本将更多数据加载到mongodb地图集中,我们将看到传播到singlestore kai的数据,以及在adtech仪表板中反映的新数据。

概括

在本文中,我们创建了一条CDC管道,以增强Mongodb Atlas与Singlestore Kai。正如several benchmarks所强调的那样,由于其出色的性能,Singlestore Kai可用于分析。我们还使用metabase创建一个快速的视觉仪表板,以帮助我们了解广告系列。