在Memgraph Graph数据库和Elasticsearch之间同步数据
#database #elasticsearch #memgraph #logstash

今天,几乎每个公司都有某种数据库作为真理的单一来源。事实证明,图数据库几乎在every industry you know about中很有用。它们在建模过程中表现出色,因为图模型可以自然绘制涉及互连数据的业务场景。 Elasticsearch是一种灵活的文本处理工具,主要用于全文搜索和索引。

尽管可以使用Memgraph(例如Memgraph)构建一些幼稚的文本搜索解决方案,但Elasticsearch擅长提供特定于文本处理的细粒度搜索功能。许多人决定使用这两个系统,但问题是如何使它们保持同步。在法师1.6中,我们引入了一个模块,该模块使开发人员可以使用基本身份验证将MEMGRAPH序列化为Elasticsearch实例。该博客将向您展示如何构建模型以及如何使用它。

同步的方法

使用ElasticsearchMemgraph作为两个完全分开的实体,并以不相关的方式存储相同的数据非常复杂,因为它复制了同步所需的所有过程和操作。这也非常麻烦,因为如果某些更新操作在一个平台上成功通过,但另一个平台失败,则系统中的状态不一致。

要考虑Memgraph和Elasticsearch同步,需要满足一些要求。数据库中的所有现有数据都需要进行索引,并且在将其插入数据库的那一刻时,应逐步索引新数据。

可以使用许多选项来满足这些要求:

  1. Logstash是一个服务器端数据处理系统,允许从200多个来源解析数据并将其发送到Elasticsearch。它还带有一个API,您可以通过该API开发一个插件,用于解析您可以想到的任何自定义应用程序的数据。
  2. 尽管尚未在memgraph中支持,但更改数据捕获(CDC)将使捕获交易中图上的更改并将其发送到ES索引。

但是,当同步备忘录和弹性搜索时,我们还应该真正考虑一下,这是可扩展的。挑战是建立一个解决方案,该解决方案需要新方法时需要最小的更改。这就是为什么我们决定使用Memgraph的Pythonic功能,并创建一个新的查询模块,该模块在Memgraph的图库MAGE中使用Elasticsearch’s API。

如果需要一个新方法的时间,则可以在python中使用自定义处理逻辑来轻松添加几行,而无需启动任何过程。由于使用网络上的通信所带来的成本,可能会注意到微妙的性能降解,但是Elasticsearch提供了许多参数,这些参数可以在索引新文档时进行调整,例如max_chunk_bytes,chchunk_size,chchunk_size,and chunk_size和在文档中并行索引的数量。

连接到Elasticsearch并索引数据库

要测试同步,我们是installed ElasticsearchMemgraph Platform。当首次启动Elasticsearch时,如果启用了身份验证,则将打印出用户名,密码,通往证书文件的路径和实例URL。由于需要连接到Elasticsearch实例,因此需要安全存储此信息。

在Memgraph实验室的数据集部分中,我们上传了空手道数据集,并将34个节点和78个边缘插入数据库中。由于数据集很小,因此测试将非常快。

以下查询将memgraph与本地或生产的弹性搜索实例相关联:

CALL elastic_search_serialization.connect("https://localhost:9200", "~/elasticsearch-8.4.3/config/certs/http_ca.crt", <ELASTIC_USER>, <ELASTIC_PASSWORD>) YIELD *;

一旦成功连接,memgraph将提供类似于下面的输出:

image alt

现在,我们可以创建一个索引,以将节点或关系序列化为文档。似乎最好的选择是创建两个索引,一个用于节点,另一个用于关系。

在创建实际的Elasticsearch索引之前,我们还需要to create a schema才能将节点和关系属性序列化为文档。我们使用了以下noderelationship schema

以下两个查询将在Elasticsearch中创建节点和关系索引:

CALL elastic_search_serialization.create_index("mem_nodes_blog", 
"/home/andi/Memgraph/code/nuix/node_index_path_schema.json", 
  {analyzer: "mem_analyzer", index_type: "vertex"}) YIELD *;
CALL elastic_search_serialization.create_index("mem_edges_blog", 
"/home/andi/Memgraph/code/nuix/edge_index_path_schema.json", 
  {analyzer: "mem_analyzer", index_type: "edge"}) YIELD *;

第一个参数是创建索引的名称,而第二个则指定了架构的路径。使用schema_parameters,您可以指定要使用的分析仪,无论该索引旨在用于节点或关系以及副本和碎片的数量。查看docs以获取更多信息。

memgraph如果成功创建了索引,则返回以下结果:

image alt

现在,现在是时候开始使用以下查询来索引实体:

CALL elastic_search_serialization.index_db("mem_nodes", "mem_edges", 4) YIELD * RETURN *; 

编号4指定用于同时索引提供的文档的Elasticsearch实例上的线程数。

完成索引后,输出是索引节点和关系的数量。

唯一要做的就是找到有关它们的节点和感兴趣的关系和查询备忘录的关系。为了使我们不复杂,让我们以前索引的所有节点。

CALL elastic_search_serialization.scan(mem_nodes_blog, "{\"query\": {\"match_all\": {}}} ") YIELD *;

上面的查询返回34个结果的列表,该列表中指定的属性:

中指定

image alt

与触发器同步

现在,我们确定实体已在elasticsearch中进行索引,我们可以考虑将数据索引到memgraph„ to to to to intexemant to memgraph````````````'''¼的索引。我们需要triggers,因为它们使我们能够在某些事件时执行特定的程序。在我们的情况下,事件是插入新节点和关系。

让我们通过创建触发器来扩展示例

CREATE TRIGGER elastic_search_create
ON CREATE AFTER COMMIT EXECUTE
CALL elastic_search_serialization.index(createdObjects, mem_nodes_blog, mem_edges_blog, 4) YIELD *;

我们可以通过创建新节点来测试触发器,并将其连接到现有用户。

CREATE (:__mg_vertex__:`User` {__mg_id__: 111111, `name`: "memgraph", `id`: "blog"});
MATCH (u), (v) WHERE u.name = 12 AND v.name = memgraph CREATE (u)-[:`FRIENDS_WITH` {test: memgraph}]->(v);

两个创建的实体将在Elasticsearch内部自动索引:

CALL elastic_search_serialization.scan(mem_nodes_blog, "{\"query\": {\"match_all\": {}}} ") YIELD *;

image alt

CALL elastic_search_serialization.scan(mem_edges_blog, "{\"query\": {\"match_all\": {}}} ") YIELD *;

image alt

memgraph甚至可以处理具有多个Elasticsearch实例的多个触发器(本质上是多对多连接)。

结论

在这篇博客文章中,我们解释了如何使用查询模块和触发器将Memgraph与Elasticsearch同步,以及拥有这种系统的好处。

您可以在Memgraph的GitHub上找到实现代码,随时检查并发表评论,我们将很乐意回应它们。出于任何疑问,我们也可以在Discord上使用。如果您感兴趣,Mage 1.6其他新颖性还会确保检查MAGE changelog

Read more about Memgraph MAGE on memgraph.com