介绍
如果您想开始消息队列,实现事件驱动的体系结构或支持依靠RabbitMQ进行异步通信的服务,则此帖子将为您提供概述和示例代码,以帮助您遇到地面跑步。如果您已经熟悉RabbitMQ,但想更多地了解其负载平衡或交换的工作方式,则可以跳到底部附近的操场概述部分。
此项目的依赖项:
项目设置
我最大程度地减少了一个命令所需的设置,并且在拉兔图像时,环境应在几秒钟内开始。
git clone https://github.com/Npfries/rabbitmq-playground
make start
运行make start
将在内部使用docker compose up
(带有特定参数)来提出服务。
兔子
RabbitMQ是一种轻巧,灵活和开源消息代理,几乎不需要配置。将消息发布给RabbitMQ的应用程序发表的应用程序所主张的队列和交流。
使用RabbitMQ时,有几个组件很重要。
- 交流
- 队列
RabbitMQ交换是可配置的经纪人,可以接收传入消息,执行一些过滤和路由以及发布到队列。有几种类型的交流,包括直接,粉丝,主题和标题交换。
交换类型 | 描述 |
---|---|
直接交换 | 将消息推入单个队列。 (默认) |
fanout Exchange | 将消息推入多个队列。 |
主题交换 | 根据消息主题执行路由。 |
标题Exchange | 根据消息标头信息执行路由。 |
RabbitMQ队列是简单的消息队列,可以隐式或明确地绑定到RabbitMQ交换。当使用AMQPLIB通道方法sendToQueue()
时,在默认直接交换之间创建隐式绑定。使用通道方法bindQueue()
创建明确的绑定。
RabbitMQ可以支持同一队列的多个订户,并且请求将在订户之间保持平衡。如果您希望有多个服务对同一消息反应,则可以使用粉丝交换来发布到多个队列,这些服务可以单独订阅队列。
为了使用AMQPLIB NPM软件包连接到RabbitMQ实例,使用amqplib.connect()
函数。
const conn = await amqplib.connect(process.env.RMQ_HOST);
这将建立与RabbitMQ实例的持续连接。可以创建 channels ,这是我们不同队列和交换操作的容器。
const ch1 = await conn.createChannel();
队列和交易所在应用程序代码中定义,通过断言它们的存在。
await ch1.assertExchange('name_of_exchange', '', { ... });
await ch1.assertQueue('name_of_queue');
然后队列可以绑定到交换。
await ch1.bindQueue('name_of_queue', 'name_of_exchange');
另外,可以简单地使用队列和使用channel.sendToQueue()
方法来使用默认直接交换,而不是明确地断言交换。
await ch1.assertQueue('name_of_queue');
ch1.sendToQueue('name_of_queue', message);
这隐藏了交易所的实现,但是尽管如此,交易所(默认直接交换)还是内部用作中介。
明确主张交换时,应使用channel.publish()
方法。
await ch1.assertExchange('name_of_exchange', '', { ... });
await ch1.assertQueue('name_of_queue');
ch1.publish('name_of_exchange', '' message);
这是一个完整的实现,展示了粉丝交换,默认的直接交换,利用两个渠道,并向两个交易所发布一个简单的消息,总共三个队列(两个粉丝,一个用于直接)。消息每100毫秒发布一次。
无论哪种情况,message
的类型都应是缓冲区。这通常是通过使用Buffer.from(data)
制备的。
// ./apps/sender/src/index.js
import amqplib from "amqplib";
(async () => {
const exchange = "tasks_exchange";
const queue1 = "tasks1";
const queue2 = "tasks2";
const queue3 = "tasks3";
const conn = await amqplib.connect(
process.env.RABBIT_MQ_HOST ?? "localhost"
);
const ch1 = await conn.createChannel();
await ch1.assertExchange(exchange, "fanout", {});
await ch1.assertQueue(queue1);
await ch1.assertQueue(queue2);
await ch1.bindQueue(queue1, exchange, "");
await ch1.bindQueue(queue2, exchange, "");
const ch2 = await conn.createChannel();
ch2.assertQueue(queue3);
setInterval(() => {
const message = Buffer.from("something to do");
ch1.publish(exchange, "", message);
ch2.sendToQueue(queue3, message);
}, 100);
})();
由于订户总是从队列中消费,而不是交换,因此在实施中的代码更加一致。
// ./apps/receiver/src/index.js
import amqplib from "amqplib";
(async () => {
/** @type {string} */
// @ts-ignore
const queue = process.env.QUEUE_NAME;
const conn = await amqplib.connect(
process.env.RABBIT_MQ_HOST ?? "localhost"
);
const channel = await conn.createChannel();
await channel.assertQueue(queue);
channel.consume(queue, (msg) => {
if (msg !== null) {
console.log("Received:", msg.content.toString());
channel.ack(msg);
} else {
console.log("Consumer cancelled by server");
}
});
})();
操场概述
提供的Node.js服务配置为使用AMQP 0-9-1 protocol与RabbitMQ进行通信。有一个很棒的软件包,amqplib,我们将在Node.js Services中用作客户端。说到服务,这里是由docker-compose.yml文件定义的服务:
# ./docker-compose.yml
version: "3.9"
services:
sender:
build:
context: ./apps/sender/
environment:
- RABBIT_MQ_HOST=amqp://rabbitmq
depends_on:
rabbitmq:
condition: service_healthy
deploy:
replicas: 1
tasks1_receiver:
build:
context: ./apps/receiver/
environment:
- RABBIT_MQ_HOST=amqp://rabbitmq
- QUEUE_NAME=tasks1
depends_on:
rabbitmq:
condition: service_healthy
deploy:
replicas: 1
tasks2_receiver:
build:
context: ./apps/receiver/
environment:
- RABBIT_MQ_HOST=amqp://rabbitmq
- QUEUE_NAME=tasks2
depends_on:
rabbitmq:
condition: service_healthy
deploy:
replicas: 1
tasks3_receiver:
build:
context: ./apps/receiver/
environment:
- RABBIT_MQ_HOST=amqp://rabbitmq
- QUEUE_NAME=tasks3
depends_on:
rabbitmq:
condition: service_healthy
deploy:
replicas: 1
rabbitmq:
image: rabbitmq:management-alpine
container_name: rabbitmq
ports:
- 15672:15672
healthcheck:
test: rabbitmq-diagnostics check_port_connectivity
interval: 3s
timeout: 30s
retries: 3
包含两种类型的node.js服务:
- sender
- 接收者
发件人服务的源代码位于./apps/sender/
中,三个接收器服务的源代码共享,位于./apps/receiver/
中。默认情况下,发件人是一个集装箱,该容器生成两个交换的消息:
- tasks_exchange(fanout Exchange)
- 默认(直接交换)
tasks_exchange将消息推到两个队列:
- 任务1
- 任务2
docker-compose.yml中定义的服务为tasks1_receiver和tasks2_receiver分别订阅任务1和tasks2。
当发件人服务将消息发送到Tasks3 Queue时,使用默认直接交换。
启动项目会产生发件人的一个实例,以及每个接收器的一个实例。可以通过将docker-compose.yml文件中的replicas
从1增加到所需实例数来增加发件人或接收器的数量。增加任何接收器的副本数量对于观察兔队队列在有多个服务订阅相同队列的实例时执行的旋转载荷负载平衡很有用。
请注意,发送到tasks_exchange的消息将发送到task1和task2队列,task1_receiver和task2_receiver彼此之间的负载不平衡,因为交换是扇形类型,并且队列是不同的。 Task1或Task2队列都不知道另一个。
在实时观看兔子如何处理延迟确认消息的确认,如何加载余额以及如何在交换和队列之间传递消息,您可以调整副本的数量,修改源代码以发送更多消息或尝试使用更多消息不同类型的交流。可以通过在端口15672上打开管理UI(如果项目在本地运行)来实时观察到RabbitMQ的指标。
如果您更改docker-compose.yml文件,则需要运行
make start
或
make dev
我建议使用make dev
,因为它会为源代码创建音量安装量并具有文件观察器,因此进行更改时应立即更新容器。
如果您想了解有关我如何创建此Docker本地开发环境的更多信息,则可以read about it here.