用GQLalchemy流式传输和触发支持
#python #流媒体 #kafka #memgraph

使用新技术堆栈从来都不是一件容易的事,尤其是当您是Python生态系统的一部分时。您习惯了许多图书馆,这些图书馆每天都会更轻松。这就是为什么我们决定在我们的对象图映射器(OGM) gqlalchemy 中添加一些缺失的功能。

从现在开始,Python开发人员将不得不创建和管理数据流数据库触发器直接使用Cypher查询语言,而是可以使用该语言gqlalchemy库以编程方式完成这些任务。

让我们看一下动作中的新功能!

先决条件

1。如果要启动虚拟kafka流并实际上与memgraph连接到它,然后克隆project data-streams并运行以下命令:

python start.py --platform kafka --dataset movielens

2。

docker-compose up memgraph-mage

3。

from gqlalchemy import Memgraph, Node, Field

memgraph = Memgraph()

class User(Node):
    name: str = Field(index=True, exists=True, unique=True, db=memgraph)

user = User(name='Ron Swanson').save(memgraph)
print(user)

发生了什么事?好吧,我们刚刚在数据库中使用标签User创建了一个节点,然后将其获取到我们的程序中。现在是时候潜入流和触发器了!

连接到GQLalchemy的数据流

流功能使Memgraph可以连接到KAFKA,PULSAR或REDPANDA群集,并在数据流上运行图分析。

1.在memgraph中创建流

此步骤非常容易。您只需要使用特定流的参数调用create_stream()方法:

from gqlalchemy import Memgraph, MemgraphKafkaStream, match

memgraph = Memgraph()

stream = MemgraphKafkaStream(
    name="ratings_stream",
    topics=["ratings"],
    transform="movielens.rating",
    bootstrap_servers="'kafka:9092'",
)
memgraph.create_stream(stream)

2.开始流

要启动流,只需调用start_stream()方法:

memgraph.start_stream(stream)

现在,让我们检查一下Memgraph摄入数据是否正在摄入:

movies = match().node(variable="m", labels="Movie").return_().limit(5).execute()
print(list(movies))

希望您只是打印出一堆电影唱片。如果没有,则意味着某处存在错误。您可以随时在我们的Discord Server上寻求帮助。

3.检查流的状态

要检查Memgraph中流的状态,只需运行以下命令:

streams = memgraph.get_streams()
print(streams)

4.删除流

您可以使用drop_stream()方法删除流:

memgraph.drop_stream(stream)

在GQLalchemy中创建数据库触发器

由于Memgraph支持CREATEUPDATEDELETE操作上的数据库触发器,因此GQLalchemy还实现了一个简单的接口,用于维护这些触发器。

为什么需要数据库触发器进行图形分析?
想象一个图表正在连续使用新数据进行更新。也许您需要告知每种更改的另一个服务或在特定更改生效后运行图形算法。触发器使创建自定义通知成为可能,如果您编写自己的query module,则可以在触发触发后执行所需的任何代码。您可以将数据发送到Kafka群集,调用远程API,将信息保存到另一个系统等。

1.创建触发器

要设置触发器,首先,使用所有必需的参数创建一个MemgraphTrigger对象:

  • name: strâ€触发的名称。
  • event_type: TriggerEventTypeâ€将触发执行的事件类型。选项是:TriggerEventType.CREATETriggerEventType.UPDATETriggerEventType.DELETE
  • event_object: TriggerEventObjectâ€event_type影响的对象。选项是:`TriggerEventObject.ALL,triggereventobject.nodeandtriggereventobject.relationship`.
  • execution_phase: TriggerExecutionPhase''应就交易提交执行触发器的阶段。选项为:BEFOREAFTER
  • statement: str`触发时应执行的密码查询。

现在,让我们在gqlalchemy中创建一个触发器:

`python
从GQLalchemy Import Memgraph,Memgraphtrigger
从gqlalchemy.models导入(
TriggereVentType,
triggereventObject,
triggerexecutionphase,

memgraph = memgraph()

trigger = memgraphtrigger(
name =“ ratings_trigger”,
event_type = triggereventtype.create,
event_object = triggereventobject.node,
execution_phase = triggerexecutionphase.per,
语句=“ undind createvertices as nat s set not node.created_at = localdatement()”,

memgraph.create_trigger(trigger)
`

每次在数据库中创建节点时,都会执行触发名称ratings_trigger。在创建有关节点完成的交易后,Cypher查询statement将执行,在这种情况下,它将将新创建的节点的属性created_at设置为当前日期和时间。

2.检查触发器的状态

您可以使用get_Triggers()方法从数据库中返回所有触发器:

`python
triggers = memgraph.get_triggers()
print(triggers)
`

3.删除触发器

您可以使用drop_trigger()方法删除触发器:

`python
memgraph.drop_trigger(trigger)
`

结论

您可以看到,与Memgraph中的流和触发器一起工作并不是特别困难,而GQLalchemy只会使工作更容易。尽管本文涵盖了一些选项,但还有更多可用的选择。您不仅可以连接到Pulsar和Redpanda流,而且通过使用查询模块,您还可以在Python中编写自定义过程,以进一步分析传入数据或通过结果传递。触发器也是如此,您不仅限于定义应执行的密码查询,而是编写仅受您对Python知识的限制的自定义过程。

如果您觉得这个简短的教程很有趣,请不要忘记在 GitHub 上查看GQLalchemy项目,并为我们抛出明星。

Read more about real time analytics on memgraph.com