在本文中,我们将学习如何将RabbitMQ集成到Robyn服务器中。
本文将显示消耗兔子队列消息的服务器的代码。以及将消息发布给队列的服务器的代码。
兔子
RabbitMQ是一家消息经纪人 - 消息传递的中介。它为您的应用程序提供了发送和接收消息的通用平台,并且您的消息是一个安全的居住地,直到收到。
要求
-
python安装
-
PIP已安装
-
兔子安装(从here下载)
消费者
在本节中,我们将构建一台从队列中消费消息的服务器。
首先,我们必须构建一个将消息发送到队列的程序。因此,这就是sender.py
所做的。在app.py
文件中,我们编写消耗消息的服务器。
sender.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body="Hello World!")
print(" [x] Sent 'Hello'")
connection.close()
在sender.py
文件中,我们导入rabbitmq库,皮卡。我们创建了与Localhost运行的RabbitMQ服务器的连接。然后,我们打开一个频道。频道是消息路由和排队的地方。
我们声明了一个名为“ Hello”的队列,将在其中发布并从中发表消息。我们发布“ Hello World!”消息给“ Hello”队列。当程序发布消息时,它将在控制台中的消息“ [x]发送'hello'”。然后我们关闭与RabbitMQ服务器的连接。
我们运行此程序,我们应该在终端中查看以下消息。
app.py
from robyn import Robyn, status_codes
from robyn.robyn import Response
import asyncio
import aio_pika
app = Robyn( __file__ )
@app.get("/")
async def hello():
return Response(status_code=status_codes.HTTP_200_OK, headers={}, body="Hello, World!")
async def startup():
loop = asyncio.get_event_loop()
connection = await aio_pika.connect("amqp://guest:guest@localhost/", loop = loop)
channel = await connection.channel()
queue = await channel.declare_queue("hello")
def callback(message: aio_pika.IncomingMessage):
body = message.body.decode("utf-8")
print(body)
await queue.consume(callback)
app.startup_handler(startup)
app.start(port=8000, url="0.0.0.0")
在app.py
文件中,我们导入aio-pika
库,该库是RabbitMQ的异步库。然后,我们创建了一个Robyn服务器,其中一个端点将“ Hello World”作为响应发送。我们定义了从RabbitMQ服务器接收消息的startup()
函数。
在startup()
函数中,我们创建了与RabbitMQ服务器的连接,创建一个频道,并声明将消耗消息的队列。然后,我们定义了callback()
函数,它接收消息并打印。
我们使用startup_handler()
注册startup()
函数,这样,当API启动时,服务器将开始从Rabbitmq队列中消耗消息。
我们启动服务器,我们应该从终端中打印的队列接收消息。
出版商
在这里,我们将构建一台服务器,将消息发布给队列和一个消耗消息的程序。
receiver.py
文件是我们编写将从队列中食用消息的程序的地方。和app.py
文件是我们编写发布它们的服务器的地方。
receiver.py
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == ' __main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
在这里,我们创建了与本地运行的RabbitMQ服务器的连接,并订阅了“ Hello”队列。从该队列接收消息时,调用了打印消息主体的回调函数。
app.py
在这里,我们将创建服务器以将消息发布到“ Hello”队列。
from robyn import Robyn, status_codes
from robyn.robyn import Response
import asyncio
import pika
app = Robyn( __file__ )
@app.get("/")
async def hello():
body = "Hello World!"
await sender(body)
return Response(status_code=status_codes.HTTP_200_OK, headers={}, body="Hello, World!")
async def sender(body: str):
loop = asyncio.get_event_loop()
connection = await aio_pika.connect("amqp://guest:guest@localhost/", loop = loop)
channel = await connection.channel()
await channel.default_exchange.publish(aio_pika.Message(body=body.encode()), routing_key='hello')
print(" [x] Sent 'Hello'")
await connection.close()
app.start(port=8000, url="0.0.0.0")
我们创建了与RabbitMQ服务器建立连接的sender()
函数,然后我们创建一个频道并声明了我们要发布消息的队列。
然后,我们发送到sender()
功能接收的主体队列。
我们启动服务器并导航或向localhost:8000/
请求。我们应该在终端中看到“ [x]已发送'hello'”消息。
在我们运行程序的receiver.py
文件的终端中,我们应该看到以下消息:
结论
总而言之,使用RabbitMQ作为消息经纪人,您可以将不同的组件分解并独立扩展。这样,我们可以构建彼此通信的Robyn微服务。
如果您对其他软件包,架构,如何改善我的代码,英语或其他任何建议有任何建议;请发表评论或通过Twitter或LinkedIn与我联系。
源代码是here。