使用Nebulaph进口商将数据导入Nebulagraph数据库
#开源 #sql #database #graphdatabase

Nebulagraph现在是具有许多生态系统工具的成熟产品。它在数据导入方面提供了广泛的选择。有大型且全面的星云交换,小而紧凑的星云进口商,以及用于火花和弗林克整合的星云火花连接器和星云连接器。

但是,众多导入方法中的哪个更方便?

这是我的看法:

星云交换

  • 如果您需要从kafka和pulsar导入流式数据库中的数据库
  • 如果您需要从关系数据库(例如MySQL)或分布式文件系统(例如HDFS)读取批处理数据
  • 如果您需要生成nebulagraph识别的SST文件,则大量数据

星云进口商

  • 星云进口商最适合将本地CSV文件导入Nebulagraph

星云火花连接器:

  • 在不同的Nebulagraph簇之间迁移数据
  • 在同一nebulagraph群集内的不同图形空间之间迁移数据
  • 在Nebulagraph和其他数据源之间迁移数据
  • 组合图形计算的星云算法

有关如何从SPACK导入数据的更多选项,请阅读:4 different ways to work with NebulaGraph in Apache Spark

Nebula Flink Connector

  • 在不同的Nebulagraph簇之间迁移数据
  • 在同一nebulagraph群集内的不同图形空间之间迁移数据
  • 在Nebulagraph和其他数据源之间迁移数据

总的来说,星云交易所大而全面,可以与大多数存储引擎相结合以进口星云,但需要部署火花环境。

星云进口商易于使用,需要更少的依赖项,但是您需要提前生成自己的数据文件并一劳永逸地配置模式,但它不支持断点传输,并且适用于中等数据量。 /p>

Spark / Flink Connector需要与流批次数据结合。< / p>

为不同方案选择不同的工具。对于新任星云的新移民,建议使用星云进口商,这是一种数据导入工具,因为它易于使用且快速启动。

使用星云进口商

当我们第一次遇到Nebulagraph时,因为生态学不是完美的,只有一些企业迁移到星云,我们曾经通过将Hive Tables推到Kafka并消耗Kafka来进口NebulaGraph data,无论是全部还是渐进, 。后来,随着越来越多的数据和企业改用Nebulagraph,导入数据效率的问题变得越来越严重。进口时间的增加使得仍然在最高营业时间在全卷中进口数据是不可接受的。

对于上述问题,在尝试了星云火花连接器和星云进口商之后,我们决定使用 hive table - csv nebula server-星云进口商为了轻松维护和迁移,数据以及所花费的总时间大大减少了。总体消耗大大减少。

配置星云进口商

系统环境

[root@nebula-server-prod-05 importer]# lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                16
On-line CPU(s) list:   0-15
Thread(s) per core:    2
Core(s) per socket:    8
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 85
Model name:            Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping:              7
CPU MHz:               2499.998
BogoMIPS:              4999.99
Hypervisor vendor:     KVM
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              1024K
L3 cache:              36608K
NUMA node0 CPU(s):     0-15
Disk:SSD
Memory: 128G

集群环境

NebulaGraph Version: v2.6.1
Deployment Method: RPM
Cluster size: 3 replicas, 6 nodes

数据大小

+---------+--------------------------+-----------+
| "Space" | "vertices"               | 559191827 |
+---------+--------------------------+-----------+
| "Space" | "edges"                  | 722490436 |
+---------+--------------------------+-----------+

星云进口商配置

# Graph version, set to v2 when connecting 2.x.
version: v2
description: Relation Space import data
# Whether to remove temporarily generated log and error data files.
removeTempFiles: false
clientSettings:
  # The number of retries for failed nGQL statement execution.
  retry: 3
  # Number of concurrency for NebulaGraph clients.
  concurrency: 5
  # The size of the cache queue for each NebulaGraph client.
  channelBufferSize: 1024
  # The NebulaGraph graph space to import data into.
  space: Relation
  # Connection information.
  connection:
    user: root
    password: ******
    address: 10.0.XXX.XXX:9669,10.0.XXX.XXX:9669
  postStart:
    # Configure some actions to be performed before inserting data after connecting to the NebulaGraph server.
    commands: |
    # The interval between the execution of the above commands and the execution of the insert data command.
    afterPeriod: 1s
  preStop:
    # Configure some actions to be performed before disconnecting from the NebulaGraph server.
    commands: |
# The path to the file where log messages such as errors will be output.    
logPath: /mnt/csv_file/prod_relation/err/test.log
....

设置crontab,Hive生成表并将其传输到Nebulagraph服务器,在流量低时在晚上运行Nebula Importer任务:

50 03 15 * * /mnt/csv_file/importer/nebula-importer -config /mnt/csv_file/importer/rel.yaml >> /root/rel.log

总共需要2个小时才能在上午6点进口全部数据。

某些日志如下,进口速度最多保持在200000/s

2022/05/15 03:50:11 [INFO] statsmgr.go:62: Tick: Time(10.00s), Finished(1952500), Failed(0), Read Failed(0), Latency AVG(4232us), Batches Req AVG(4582us), Rows AVG(195248.59/s)
2022/05/15 03:50:16 [INFO] statsmgr.go:62: Tick: Time(15.00s), Finished(2925600), Failed(0), Read Failed(0), Latency AVG(4421us), Batches Req AVG(4761us), Rows AVG(195039.12/s)
2022/05/15 03:50:21 [INFO] statsmgr.go:62: Tick: Time(20.00s), Finished(3927400), Failed(0), Read Failed(0), Latency AVG(4486us), Batches Req AVG(4818us), Rows AVG(196367.10/s)
2022/05/15 03:50:26 [INFO] statsmgr.go:62: Tick: Time(25.00s), Finished(5140500), Failed(0), Read Failed(0), Latency AVG(4327us), Batches Req AVG(4653us), Rows AVG(205619.44/s)
2022/05/15 03:50:31 [INFO] statsmgr.go:62: Tick: Time(30.00s), Finished(6080800), Failed(0), Read Failed(0), Latency AVG(4431us), Batches Req AVG(4755us), Rows AVG(202693.39/s)
2022/05/15 03:50:36 [INFO] statsmgr.go:62: Tick: Time(35.00s), Finished(7087200), Failed(0), Read Failed(0), Latency AVG(4461us), Batches Req AVG(4784us), Rows AVG(202489.00/s)

然后,在7:00,Kafka被重新完成,以根据时间戳从一天清晨导入7:00的增量数据,从而阻止了T+1数据的全部t+1数据覆盖的增量数据一天。

增量消耗大约需要10-15分钟。

即时的

从MD5比较获得的增量数据被进口到KAFKA中,并实时消耗KAFKA数据,以确保数据延迟不超过1分钟。

此外,可能会有很长时间实时检测到的意外数据问题,因此每30天导入全部数据,这是上面描述的星云进口国。然后添加a_ ttl = 35天_到空间的点侧,以确保未在时间上更新的任何数据都会被过滤并随后回收。

关于作者

里德(Reid)是中国最大的公司信息平台Qichacha的工程师。