自动同步将整个MySQL数据库用于数据分析
#oracle #database #mysql #datascience

flink-doris-connector 1.4.0允许用户摄入整个数据库( mysql oracle ),其中包含数千个表格中的Apache Doris,这是一个实时分析数据库,一个步骤

使用内置的FLINK CDC,连接器可以直接将表格架和从上游源到Apache Doris同步,这意味着用户不再需要在Doris中编写DataStream程序或预先创建的映射表。

flink作业开始时,连接器会自动检查源数据库和Apache Doris之间的数据等价。如果数据源包含Doris中不存在的表,则连接器将在Doris中自动创建相同的表格,并利用Flink的侧面输出来促进一次摄入多个表。如果源有模式变化,它将自动获取DDL语句,并在多丽丝中进行相同的模式更改。

快速开始

对于mysql

下载jar文件:https://github.com/apache/doris-flink-connector/releases/tag/1.4.0

maven:

<dependency>
  <groupId>org.apache.doris</groupId>
  <artifactId>flink-doris-connector-1.15</artifactId>
  <!--artifactId>flink-doris-connector-1.16</artifactId-->
  <!--artifactId>flink-doris-connector-1.17</artifactId-->
  <version>1.4.0</version>
</dependency>

oracle

下载jar文件:Flink 1.15Flink 1.16Flink 1.17

如何使用它

例如,要将整个MySQL数据库mysql_db摄入Doris(MySQL表名称以tbltest开头),只需执行以下命令(无需提前在Doris中创建表):

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.4.0.jar \
    mysql-sync-database \
    --database test_db \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=mysql_db \
    --including-tables "tbl|test.*" \
    --sink-conf fenodes=127.0.0.1:8030 \
    --sink-conf username=root \
    --sink-conf password=123456 \
    --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
    --sink-conf sink.label-prefix=label1 \
    --table-conf replication_num=1 

摄入Oracle数据库:请参阅example code

它的性能

当涉及整个数据库(包含数百甚至数千个表,活动性或无效)时,大多数用户希望在几秒钟内完成它。因此,我们测试了连接器,以查看是否出现了:

  • 1000个MySQL表,每个表都有100个字段。所有表都均为活动(这意味着它们一直在不断更新,并且每个数据编写涉及一百行)
  • Flink Job Checkpoint:10s

在压力测试下,该系统显示出较高的稳定性,密钥指标如下:

1

2

3

根据早期采用者的反馈,该连接器还在其生产环境中提供了10,000台台式数据库同步的高性能和系统稳定性。这证明了Apache Doris和Flink CDC的组合能够具有高效率和可靠性的大规模数据同步。

它如何使数据工程师受益

工程师不再需要担心桌子创建或表格架构维护,从而节省了乏味且容易出错的工作。以前在Flink CDC中,您需要为每个表创建一个flink作业,并在源端建立一个日志解析链接,但是现在随着全数据库摄入,源数据库中的重新消耗大大减少了。它也是增量更新和完整更新的统一解决方案。

其他特性

1。加入维度表和事实表

常见的做法是将尺寸表放入多丽丝(Doris),并通过Flink的实时流来运行加入查询。基于Async I/O of Flink,Flink-Doris-Connector 1.4.0实现异步查找连接,因此由于查询而不会阻止Flink实时流。此外,连接器允许您将Mulitple查询组合到一个大查询中,并立即将其发送到Doris进行处理。这提高了此类联接查询的效率和吞吐量。

2。节俭 SDK

我们将节日服务SDK引入了连接器中,因此用户不再需要使用旧货插件或配置编译中的旧环境。这使得汇编过程变得更加简单。

3。按需流载荷

在数据同步期间,当没有新的数据摄入时,将不会发出流载荷请求。这避免了不必要的集群资源消费。

4。投票的后端节点

对于数据摄入,多丽丝(Doris)调用一个前端节点以获取后端节点的列表,然后随机选择一个来启动摄入请求。该后端节点将是协调员。 Flink-doris连接器1.4.0允许用户启用投票机制,这是在每个FLINK检查点上具有不同的后端节点作为协调器,以避免长时间对单个后端节点上的压力太大。

5。支持更多数据类型

除了常见的数据类型外,Flink-Doris-Connector 1.4.0支持DERIS中的Decimalv3/datev2/dateTimev2/array/json。

示例用法

从Apache Doris阅读:

您可以通过DataStream或Flinksql(边界流)从Doris读取数据。支持谓词下降。

CREATE TABLE flink_doris_source (
    name STRING,
    age INT,
    score DECIMAL(5,2)
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '127.0.0.1:8030',
      'table.identifier' = 'database.table',
      'username' = 'root',
      'password' = 'password',
      'doris.filter.query' = 'age=18'
);

SELECT * FROM flink_doris_source;

加入维度表和事实表

CREATE TABLE fact_table (
  `id` BIGINT,
  `name` STRING,
  `city` STRING,
  `process_time` as proctime()
) WITH (
  'connector' = 'kafka',
  ...
);

create table dim_city(
  `city` STRING,
  `level` INT ,
  `province` STRING,
  `country` STRING
) WITH (
  'connector' = 'doris',
  'fenodes' = '127.0.0.1:8030',
  'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
  'lookup.jdbc.async' = 'true',
  'table.identifier' = 'dim.dim_city',
  'username' = 'root',
  'password' = ''
);

SELECT a.id, a.name, a.city, c.province, c.country,c.level 
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city

写给apache doris

CREATE TABLE doris_sink (
    name STRING,
    age INT,
    score DECIMAL(5,2)
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '127.0.0.1:8030',
      'table.identifier' = 'database.table',
      'username' = 'root',
      'password' = '',
      'sink.label-prefix' = 'doris_label',
      //json write in
      'sink.properties.format' = 'json',
      'sink.properties.read_json_by_line' = 'true'
);

如果您有任何疑问,请在Slack上找到Apache Doris开发人员。