为传统行业建造数据仓库:Hadoop是过度的
#教程 #database #分析 #storage

Midland Realty的数据仓库规划师和数据分析师Herman Seah

这是房地产巨头数字化转型的一部分。为了机密性,我不会透露任何业务数据,但是您会详细了解我们的数据仓库和我们的优化策略。

现在开始。

建筑学

从逻辑上讲,我们的数据架构可以分为四个部分。

1

  • 数据集成:这得到了Flink CDC,Datax和Apache Doris的多录制功能的支持。
  • 数据管理:我们使用Apache Dolphinscheduler进行脚本生命周期管理,多租户管理中的特权和数据质量监控。
  • 提醒:我们使用Grafana,Prometheus和Loki监视组件资源和日志。
  • 数据服务:这是BI工具介入用户交互的地方,例如数据查询和分析。

1.

我们创建了尺寸表和事实表,以企业中的每个操作实体(包括客户,房屋等)中心。如果有一系列涉及相同操作实体的活动,则应由一个字段记录。 (这是从我们以前的混乱数据管理系统中学到的一堂课。)

2.

我们的数据仓库分为五个概念层。我们使用Apache Doris和Apache Dolphinscheduler来安排这些层之间的DAG脚本。

2

每天,除了增加历史状态字段的变化或不完整的数据同步ODS表的情况下,这些图层除了增量更新外,都会经历整体更新。

3. 增量更新策略

(1)设置where >= "activity time -1 day or -1 hour"而不是where >= "activity time

这样做的原因是防止由调度脚本的时间差距引起的数据漂移。假设,将执行间隔设置为10分钟,假设脚本是在23:58:00执行的,并且如果我们设置了where >= "activity time,则在23:59:00到达了一块新数据会错过。

(2)在每个脚本执行之前获取表最大的主键的ID,将ID存储在辅助表中,然后设置where >= "ID in auxiliary table"

这是为了避免数据重复。如果您使用Apache Doris的唯一密钥模型并指定一组主要键,则可能会发生数据重复,因为如果源表中的主键中有任何更改,则将记录更改并加载相关数据。此方法可以解决此问题,但是仅在源表具有自动插入主键时才适用。

(3)分区表

对于基于时间的自动收入数据,例如日志表,历史数据和状态的变化可能会更少,但是数据量很大,因此在整体更新和快照创建方面可能会有巨大的计算压力。因此,最好分区此类表,因此,对于每个增量更新,我们只需要替换一个分区即可。 (您可能还需要注意数据漂移。)

4. 总体更新策略

(1)截短表

清除表,然后将所有数据从源表中摄取。这适用于小型桌子和场景,没有用户活动。

(2)ALTER TABLE tbl1 REPLACE WITH TABLE tbl2

这是一个原子操作,建议大型表。每次执行脚本之前,我们都会创建一个带有相同架构的临时表,将所有数据加载到其中,然后用它替换原始表。

应用

  • ETL Job :每分钟
  • 首次部署的配置:8个节点,2个前端,8个后端,混合部署
  • 节点配置:32C * 60GB * 2TB SSD

这是我们针对旧数据和增量数据的GB的TB的配置。您可以将其用作参考,并在此基础上扩展群集。 Apache Doris的部署很简单。您不需要其他组件。

1.为了整合离线数据和日志数据,我们使用数据纳克斯,该数据支持CSV格式和许多关系数据库的读者,Apache Doris提供了数据纳克斯 - doris-writer。

3

2.我们使用Flink CDC从源表中同步数据。然后,我们利用实现的视图或Apache Doris的聚合模型来汇总实时指标。由于我们只需要实时处理一部分指标,并且我们不想生成太多数据库连接,我们使用一个flink作业来维护多个CDC源表。这是由Dinky的多源合并和完整数据库同步功能实现的,或者您可以自己实现Flink DataStream多源合并任务。值得注意的是,Flink CDC和Apache Doris支持模式更改。

EXECUTE CDCSOURCE demo_doris WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'checkpoint' = '10000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  'table-name' = 'ods.ods_*,ods.ods_*',
  'sink.connector' = 'doris',
  'sink.fenodes' = '127.0.0.1:8030',
  'sink.username' = 'root',
  'sink.password' = '123456',
  'sink.doris.batch.size' = '1000',
  'sink.sink.max-retries' = '1',
  'sink.sink.batch.interval' = '60000',
  'sink.sink.db' = 'test',
  'sink.sink.properties.format' ='json',
  'sink.sink.properties.read_json_by_line' ='true',
  'sink.table.identifier' = '${schemaName}.${tableName}',
  'sink.sink.label-prefix' = '${schemaName}_${tableName}_1'
);

3.我们使用SQL脚本或“ Shell + SQL”脚本,然后执行脚本生命周期管理。在ODS层,我们为每个源表摄取的一般数据作业文件编写一个常规数据,而不是为每个源表编写数据纳克斯作业。这样,我们使事情更容易维护。我们在Dolphinscheduler上管理Apache Doris的ETL脚本,在那里我们还进行了版本控制。如果生产环境中的任何错误,我们总是可以回滚。

4

4.用ETL脚本摄入数据后,我们在报告工具中创建一个页面。我们使用SQL将不同的特权分配给不同的帐户,包括修改行,字段和全球词典的特权。 Apache Doris支持对帐户的特权控制,该控制与MySQL相同。

5

我们还使用Apache Doris数据备份进行灾难恢复,Apache Doris审核日志监视SQL执行效率,Grafana+LOKI用于集群度量警报和主管来监视节点组件的守护程序过程。

>

优化

1.数据摄入

我们使用Datax将负载脱机数据流式传输。它使我们能够调整每批的大小。流负载方法同步返回结果,该结果满足我们体系结构的需求。如果我们使用DolphinsCheduler执行异步数据导入,则系统可能会假定脚本已执行,并且可能导致MESSUP。如果使用其他方法,我们建议您在Shell脚本中执行show load,并检查Regex过滤状态以查看摄入是否成功。

2.数据模型

我们在大多数表中采用了Apache Doris的独特关键模型。唯一的关键模型可确保数据脚本的势力,并有效避免上游数据重复。

3.读取外部数据

我们使用Apache Doris的多录制功能连接到外部数据源。它使我们能够在目录级别创建外部数据的映射。

4.查询优化

我们建议您将最常用的非字符类型的字段(例如INT和条款)放在前36个字节中,因此您可以在点查询中将这些字段过滤到毫秒内。

5.数据字典

对我们来说,创建一个数据字典很重要,因为它大大降低了人事的沟通成本,当您拥有大型团队时,这可能会令人头疼。我们使用Apache Doris中的information_schema来生成数据字典。有了它,我们可以迅速掌握表和田地的整体图片,从而提高了发展效率。

表现

离线数据摄入时间:在几分钟之内

查询延迟:对于包含超过1亿行的表,Apache Doris在一秒钟内响应了临时查询,并且在五秒钟内响应了复杂的查询。

资源消耗:它仅占用少量服务器来构建此数据仓库。 Apache Doris的70%压缩比为我们节省了很多存储资源。

经验和结论

实际上,在我们发展到当前的数据体系结构之前,我们尝试了Hive,Spark和Hadoop来构建离线数据仓库。事实证明,对于像我们这样的传统公司而言,Hadoop对我们没有太多数据进行处理,这是过分的。找到最适合您的组件很重要。

6

(我们的旧离线数据仓库)

另一方面,要使我们的大数据过渡平滑,我们需要在使用和维护方面使数据平台尽可能简单。这就是为什么我们降落在Apache Doris上的原因。它与MySQL协议兼容,并提供了丰富的功能集合,因此我们不必开发自己的UDF。另外,它仅由两种类型的过程组成:前端和后端,因此易于扩展和跟踪

找到Apache Doris community here