flink-doris-connector 1.4.0允许用户摄入整个数据库( mysql 或 oracle ),其中包含数千个表格中的Apache Doris,这是一个实时分析数据库,一个步骤。
使用内置的FLINK CDC,连接器可以直接将表格架和从上游源到Apache Doris同步,这意味着用户不再需要在Doris中编写DataStream程序或预先创建的映射表。
flink作业开始时,连接器会自动检查源数据库和Apache Doris之间的数据等价。如果数据源包含Doris中不存在的表,则连接器将在Doris中自动创建相同的表格,并利用Flink的侧面输出来促进一次摄入多个表。如果源有模式变化,它将自动获取DDL语句,并在多丽丝中进行相同的模式更改。
快速开始
对于mysql :
下载jar文件:https://github.com/apache/doris-flink-connector/releases/tag/1.4.0
maven:
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.15</artifactId>
<!--artifactId>flink-doris-connector-1.16</artifactId-->
<!--artifactId>flink-doris-connector-1.17</artifactId-->
<version>1.4.0</version>
</dependency>
oracle :
下载jar文件:Flink 1.15,Flink 1.16,Flink 1.17
如何使用它
例如,要将整个MySQL数据库mysql_db
摄入Doris(MySQL表名称以tbl
或test
开头),只需执行以下命令(无需提前在Doris中创建表):
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.4.0.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label1 \
--table-conf replication_num=1
摄入Oracle数据库:请参阅example code。
它的性能
当涉及整个数据库(包含数百甚至数千个表,活动性或无效)时,大多数用户希望在几秒钟内完成它。因此,我们测试了连接器,以查看是否出现了:
- 1000个MySQL表,每个表都有100个字段。所有表都均为活动(这意味着它们一直在不断更新,并且每个数据编写涉及一百行)
- Flink Job Checkpoint:10s
在压力测试下,该系统显示出较高的稳定性,密钥指标如下:
根据早期采用者的反馈,该连接器还在其生产环境中提供了10,000台台式数据库同步的高性能和系统稳定性。这证明了Apache Doris和Flink CDC的组合能够具有高效率和可靠性的大规模数据同步。
它如何使数据工程师受益
工程师不再需要担心桌子创建或表格架构维护,从而节省了乏味且容易出错的工作。以前在Flink CDC中,您需要为每个表创建一个flink作业,并在源端建立一个日志解析链接,但是现在随着全数据库摄入,源数据库中的重新消耗大大减少了。它也是增量更新和完整更新的统一解决方案。
其他特性
1。加入维度表和事实表
常见的做法是将尺寸表放入多丽丝(Doris),并通过Flink的实时流来运行加入查询。基于Async I/O of Flink,Flink-Doris-Connector 1.4.0实现异步查找连接,因此由于查询而不会阻止Flink实时流。此外,连接器允许您将Mulitple查询组合到一个大查询中,并立即将其发送到Doris进行处理。这提高了此类联接查询的效率和吞吐量。
2。节俭 SDK
我们将节日服务SDK引入了连接器中,因此用户不再需要使用旧货插件或配置编译中的旧环境。这使得汇编过程变得更加简单。
3。按需流载荷
在数据同步期间,当没有新的数据摄入时,将不会发出流载荷请求。这避免了不必要的集群资源消费。
4。投票的后端节点
对于数据摄入,多丽丝(Doris)调用一个前端节点以获取后端节点的列表,然后随机选择一个来启动摄入请求。该后端节点将是协调员。 Flink-doris连接器1.4.0允许用户启用投票机制,这是在每个FLINK检查点上具有不同的后端节点作为协调器,以避免长时间对单个后端节点上的压力太大。
5。支持更多数据类型
除了常见的数据类型外,Flink-Doris-Connector 1.4.0支持DERIS中的Decimalv3/datev2/dateTimev2/array/json。
示例用法
从Apache Doris阅读:
您可以通过DataStream或Flinksql(边界流)从Doris读取数据。支持谓词下降。
CREATE TABLE flink_doris_source (
name STRING,
age INT,
score DECIMAL(5,2)
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = 'password',
'doris.filter.query' = 'age=18'
);
SELECT * FROM flink_doris_source;
加入维度表和事实表:
CREATE TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);
create table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'lookup.jdbc.async' = 'true',
'table.identifier' = 'dim.dim_city',
'username' = 'root',
'password' = ''
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
写给apache doris :
CREATE TABLE doris_sink (
name STRING,
age INT,
score DECIMAL(5,2)
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label',
//json write in
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true'
);
如果您有任何疑问,请在Slack上找到Apache Doris开发人员。