抽象的
Singlestore提供了当前在预览中的更改数据捕获(CDC)解决方案,以将数据从MongoDB流到Singlestore Kai。在本文中,我们将查看如何将Apache Kafka代理连接到MongoDB Atlas,然后使用CDC解决方案将数据从MongoDB Atlas流到Singlestore Kai。我们还将使用metabase为单闸Kai创建一个简单的分析仪表板。
本文中使用的笔记本文件可在GitHub上找到。
介绍
CDC是一种跟踪数据库或系统中发生的更改的方法。 Singlestore现在提供了当前在预览中的CDC解决方案,可与MongoDB一起使用。
为了演示CDC解决方案,我们将使用KAFKA代理将数据传输到MongoDB Atlas群集,然后使用CDC管道将Mongodb Atlas的数据传播到Singlestore Kai。我们还将使用Metabase创建一个简单的分析仪表板。
图1显示了我们系统的高级体系结构。
我们将在未来文章中使用CDC解决方案重点关注其他方案。
Mongodb地图集
我们将在M0沙盒中使用MongoDB地图集。我们将在 atlasadmin 特权数据库访问下配置 admin 用户。我们将在网络访问下暂时允许从任何地方(IP地址0.0.0.0/0)访问。我们将记下用户名,密码和主机。
Apache Kafka
我们将配置KAFKA代理将数据流到MongoDB地图集中。我们将使用Jupyter笔记本来实现这一目标。
首先,我们将安装一些库:
!pip install pymongo kafka-python --quiet
接下来,我们将连接到Mongodb Atlas和Kafka经纪人:
from kafka import KafkaConsumer
from pymongo import MongoClient
try:
client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")
db = client.adtech
print("Connected successfully")
except:
print("Could not connect")
consumer = KafkaConsumer(
"ad_events",
bootstrap_servers = ["public-kafka.memcompute.com:9092"]
)
我们将用MongoDB Atlas保存的值替换<username>
,<password>
和<host>
。
最初,我们将将100个记录加载到MongoDB地图集,如下:
MAX_ITERATIONS = 100
for iteration, message in enumerate(consumer, start = 1):
if iteration > MAX_ITERATIONS:
break
try:
record = message.value.decode("utf-8")
user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t"))
events_record = {
"user_id": int(user_id),
"event_name": event_name,
"advertiser": advertiser,
"campaign": int(campaign.split()[0]),
"gender": gender,
"income": income,
"page_url": page_url,
"region": region,
"country": country
}
db.events.insert_one(events_record)
except Exception as e:
print(f"Iteration {iteration}: Could not insert data - {str(e)}")
数据应成功加载,我们应该看到一个名为adtech
的数据库,其中包含一个名为events
的集合。集合中的文档的结构应与以下示例相似:
_id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
user_id: 3857963415
event_name: "Impression"
advertiser: "Sherwin-Williams"
campaign: 13
gender: "Female"
income: "25k and below",
page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
region: "Michigan"
country: "US"
这些文档代表广告活动事件。 events
Collection存储advertiser
,campaign
的详细信息以及有关用户的各种人口统计信息,例如gender
和income
。
Singlestore Kai
previous article展示了创建一个免费的SinglestoredB云帐户的步骤。我们将使用以下设置:
- 工作区组名称: CDC演示组
- 云提供商: aws
- 地区:美国东1(N。Virginia)
- 工作空间名称: cdc-demo
- 大小: S-00
-
设置:
- SINGLESTORE KAI选择
可用工作空间后,我们将记下我们的密码和主机。主机将从 CDC演示组>概述>工作区> CDC-DEMO> Connect> Connect> Connect>直接连接> SQL IDE>主机>主机。稍后将需要此信息以获取Metabase。我们还将通过在 CDC演示组>防火墙> firewall>。
从左导航窗格中,我们将选择开发> SQL Editor 创建 我们将用Mongodb Atlas先前保存的值替换 现在,我们将检查任何表格,如下: 这应该显示一张名为 我们将检查表的结构: 输出应如下: 接下来,我们将检查任何 这将显示一个称为 现在我们将启动 ,国家应更改为 如果我们现在运行以下命令: 结果应返回100: 我们将在事件表中检查一行,如下: 输出应类似于以下内容: CDC解决方案已成功连接到MongoDB地图集,并将所有100个记录复制到Singlestore Kai。 现在,我们使用metabase创建一个仪表板。 previous article中描述了如何安装,配置和创建与Metabase的连接的详细信息。我们将使用早期文章中使用的查询的微小变化来创建可视化。 图2显示了图表大小并放在Adtech仪表板上的示例。我们将Auto-Refresh选项设置为1分钟。 如果我们通过更改adtech
数据库和link
,如下:
CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;
DROP LINK adtech.link;
CREATE LINK adtech.link AS MONGODB
CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017",
"collection.include.list": "adtech.*",
"mongodb.ssl.enabled": "true",
"mongodb.authsource": "admin",
"mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user": "<username>",
"mongodb.password": "<password>"}';
CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;
<username>
和<password>
。我们还需要用MongoDB Atlas的每个地址替换<primary>
,<secondary>
和<secondary>
的值。
SHOW TABLES;
events
的表:
+------------------+
| Tables_in_adtech |
+------------------+
| events |
+------------------+
DESCRIBE events;
+-------+------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+------+---------+-------+
| _id | text | NO | UNI | NULL | |
| _more | JSON | NO | | NULL | |
+-------+------+------+------+---------+-------+
pipelines
:
SHOW PIPELINES;
events
的管道,该管道当前是Stopped
:
+---------------------+---------+-----------+
| Pipelines_in_adtech | State | Scheduled |
+---------------------+---------+-----------+
| events | Stopped | False |
+---------------------+---------+-----------+
events
管道:
START ALL PIPELINES;
Running
:
+---------------------+---------+-----------+
| Pipelines_in_adtech | State | Scheduled |
+---------------------+---------+-----------+
| events | Running | False |
+---------------------+---------+-----------+
SELECT COUNT(*) FROM events;
+----------+
| COUNT(*) |
+----------+
| 100 |
+----------+
SELECT * FROM events LIMIT 1;
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| _id | _more |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
metabase
1.事件总数
SELECT COUNT(*) FROM events;
2.按地区划分的事件
SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;
3.前5名广告商的活动
SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;
4.根据性别和收入的访问者
SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
WHEN xx.z___min_rank = xx.z___rank THEN 1
ELSE 0
END AS z__is_highest_ranked_cell
FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
FROM (SELECT *, RANK() OVER (ORDER BY CASE
WHEN bb.z__pivot_col_rank = 1 THEN (CASE
WHEN bb.`events.count` IS NOT NULL THEN 0
ELSE 1
END)
ELSE 2
END, CASE
WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
ELSE NULL
END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
WHEN ww.`events.gender` IS NULL THEN 1
ELSE 0
END, ww.`events.gender`) AS z__pivot_col_rank
FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::income <> 'unknown' OR _more::income IS NULL)
GROUP BY 1, 2) ww) bb
WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;
MAX_ITERATIONS
使用jupyter笔记本将更多数据加载到mongodb地图集中,我们将看到传播到singlestore kai的数据,以及在adtech仪表板中反映的新数据。
概括
在本文中,我们创建了一条CDC管道,以增强Mongodb Atlas与Singlestore Kai。正如several benchmarks所强调的那样,由于其出色的性能,Singlestore Kai可用于分析。我们还使用metabase创建一个快速的视觉仪表板,以帮助我们了解广告系列。