将Robyn与RabbitMQ整合在一起| Python
#python #rabbitmq #robyn

在本文中,我们将学习如何将RabbitMQ集成到Robyn服务器中。

本文将显示消耗兔子队列消息的服务器的代码。以及将消息发布给队列的服务器的代码。

兔子

RabbitMQ是一家消息经纪人 - 消息传递的中介。它为您的应用程序提供了发送和接收消息的通用平台,并且您的消息是一个安全的居住地,直到收到。

要求

  • python安装

  • PIP已安装

  • 兔子安装(从here下载)

消费者

在本节中,我们将构建一台从队列中消费消息的服务器。

首先,我们必须构建一个将消息发送到队列的程序。因此,这就是sender.py所做的。在app.py文件中,我们编写消耗消息的服务器。

sender.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body="Hello World!")
print(" [x] Sent 'Hello'")
connection.close()

sender.py文件中,我们导入rabbitmq库,皮卡。我们创建了与Localhost运行的RabbitMQ服务器的连接。然后,我们打开一个频道。频道是消息路由和排队的地方。

我们声明了一个名为“ Hello”的队列,将在其中发布并从中发表消息。我们发布“ Hello World!”消息给“ Hello”队列。当程序发布消息时,它将在控制台中的消息“ [x]发送'hello'”。然后我们关闭与RabbitMQ服务器的连接。

我们运行此程序,我们应该在终端中查看以下消息。

app.py

from robyn import Robyn, status_codes

from robyn.robyn import Response
import asyncio
import aio_pika

app = Robyn( __file__ )

@app.get("/")
async def hello():
    return Response(status_code=status_codes.HTTP_200_OK, headers={}, body="Hello, World!")

async def startup():
    loop = asyncio.get_event_loop()
    connection = await aio_pika.connect("amqp://guest:guest@localhost/", loop = loop)
    channel = await connection.channel()
    queue = await channel.declare_queue("hello")

    def callback(message: aio_pika.IncomingMessage):
        body = message.body.decode("utf-8")

        print(body)
    await queue.consume(callback)    

app.startup_handler(startup)

app.start(port=8000, url="0.0.0.0")

app.py文件中,我们导入aio-pika库,该库是RabbitMQ的异步库。然后,我们创建了一个Robyn服务器,其中一个端点将“ Hello World”作为响应发送。我们定义了从RabbitMQ服务器接收消息的startup()函数。

startup()函数中,我们创建了与RabbitMQ服务器的连接,创建一个频道,并声明将消耗消息的队列。然后,我们定义了callback()函数,它接收消息并打印。

我们使用startup_handler()注册startup()函数,这样,当API启动时,服务器将开始从Rabbitmq队列中消耗消息。

我们启动服务器,我们应该从终端中打印的队列接收消息。

出版商

在这里,我们将构建一台服务器,将消息发布给队列和一个消耗消息的程序。

receiver.py文件是我们编写将从队列中食用消息的程序的地方。和app.py文件是我们编写发布它们的服务器的地方。

receiver.py

import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(f" [x] Received {body}")

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == ' __main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

在这里,我们创建了与本地运行的RabbitMQ服务器的连接,并订阅了“ Hello”队列。从该队列接收消息时,调用了打印消息主体的回调函数。

app.py

在这里,我们将创建服务器以将消息发布到“ Hello”队列。

from robyn import Robyn, status_codes

from robyn.robyn import Response
import asyncio
import pika

app = Robyn( __file__ )

@app.get("/")
async def hello():
    body = "Hello World!"
    await sender(body)
    return Response(status_code=status_codes.HTTP_200_OK, headers={}, body="Hello, World!")

async def sender(body: str):
    loop = asyncio.get_event_loop()
    connection = await aio_pika.connect("amqp://guest:guest@localhost/", loop = loop)
    channel = await connection.channel()

    await channel.default_exchange.publish(aio_pika.Message(body=body.encode()), routing_key='hello')
    print(" [x] Sent 'Hello'")
    await connection.close()

app.start(port=8000, url="0.0.0.0")

我们创建了与RabbitMQ服务器建立连接的sender()函数,然后我们创建一个频道并声明了我们要发布消息的队列。

然后,我们发送到sender()功能接收的主体队列。

我们启动服务器并导航或向localhost:8000/请求。我们应该在终端中看到“ [x]已发送'hello'”消息。

在我们运行程序的receiver.py文件的终端中,我们应该看到以下消息:

结论

总而言之,使用RabbitMQ作为消息经纪人,您可以将不同的组件分解并独立扩展。这样,我们可以构建彼此通信的Robyn微服务。

如果您对其他软件包,架构,如何改善我的代码,英语或其他任何建议有任何建议;请发表评论或通过TwitterLinkedIn与我联系。

源代码是here

资源

RabbitMQ Tutorial One

aio-pika library documentation

Robyn documentation