如何处理Node.js的预定队列工作,并在Heroku上使用BullMq和Redis
#node #redis #queues #bullmq

背景工作处理是一种运行任务的技术,在与主应用程序服务器单独的过程中完成可能需要很长时间才能完成。这允许主要的应用程序服务器在运行后台工作时继续处理用户的请求。

您可能想在node.js服务器应用程序中使用后台作业处理的原因有很多。例如,您可以使用它来:

  • 处理大文件或图像
  • 发送电子邮件或SMS消息
  • 运行批处理作业
  • 执行复杂的计算
  • 安排任务在以后的时间

在Node.js中实现背景作业处理有多种不同的方法。一种流行的方法是使用消息队列。消息队列是第一个第一(FIFO)系统,它允许您在不同过程之间发送消息。当您要运行后台作业时,您可以在队列中添加消息。然后,该消息将通过一个单独的工作过程处理,该过程是专门用于运行背景作业的过程。

在本文中,我将分享如何将BullMQ NPM软件包与Redis Server一起使用NODE.JS中实现背景作业处理。我还将介绍如何将您的背景作业申请部署到Heroku。

本教程的要求

要开始,您将需要安装以下内容:

  • node.js
  • redis
  • 公牛和Redis库
  • Heroku Cli

安装了这些内容后,您可以按照本文中的说明来创建您的背景作业应用程序。

快速介绍Heroku

如果您已经知道或使用Heroku,欢迎您跳过。

Heroku是一个云平台,可轻松部署和扩展Web应用程序。它提供了许多功能,使其非常适合部署背景作业应用程序,包括:

  • 自动缩放:Heroku会根据需求自动缩放您的应用程序,因此您不必担心管理自己的基础架构。

  • 托管服务:Heroku提供了许多托管服务,例如Redis和Postgres,因此您不必担心自己提供和管理这些服务。

要开始使用Heroku,您将需要创建一个帐户并安装Heroku CLI。完成此操作后,您可以创建一个新的Heroku应用程序并将您的应用程序部署到它。

使用Heroku进行背景工作处理的好处是,它易于扩展,自然适合长期运行的过程,并且可以轻松地使用主应用程序服务器将您的后台工作应用程序化。

快速介绍BullMQ和Redis

BullMQ是node.js的消息队列库。它允许您在不同过程之间发送和接收消息。 REDIS是一个内存数据存储,BullMQ用作消息队列。

BullMQ和Redis共同努力,提供一种可靠且可扩展的方法来处理背景作业。 BullMQ处理消息的排队和路由,而Redis存储消息并提供了一种访问它们的高性能,一致的方法。

BULLMQ的一些关键功能包括:

  • 支持多种队列类型,包括FIFO,LIFO和优先队列。
  • 允许您安排工作以稍后运行。
  • 支持失败的工作重试。
  • 支持工作的利率限制。

用Redis设置Heroku申请

要使用Redis设置Heroku申请,您将需要:

  • 创建一个新的Heroku应用程序。
  • 提供重新的实例。
  • 设置新工人配置的procfile。

这是您可以使用的命令:

heroku create my-app
heroku addons:create heroku-redis:hobby-dev
echo "worker: node worker.js" >> Procfile

第一个命令创建了一个名为my-app的新的Heroku应用程序。第二个命令规定在业余爱好开发计划中重新定义了实例。第三个命令创建了Heroku应用程序基础结构文件Procfile,该文件告诉Heroku提供背景工作处理资源并为其运行node worker.js命令。

worker.js文件是您将代码处理以处理背景作业的地方。我们接下来要去!

BullMQ工人

工人是将从队列中消费工作并运行后台作业的过程。在Heroku中,一般而言,这是一个单独的node.js运行时过程,因此,它完全无关。如果您有一个。

通常也被认为是一个长期运行的过程,这意味着它将无限期地运行直到停止。如果没有工作,那就简单地闲置。它将连接到REDIS实例,并收听将作业添加到队列中。当将作业添加到队列中时,工人将处理作业,然后将其标记为完整。

设立BullMQ工人

设置BullMq工作者与创建新文件worker.js和添加以下代码一样简单:

import { Worker } from "bullmq";

// Redis connection details
 const workerConnectionOptions = {
    host: import.meta.env.hostname,
    port: import.meta.env.port,
    password: import.meta.env.password,
};

// The following sets up the worker:
// 1. Connects to a queue named `uploaded_files_queue`
// 2. Runs the `workerJobHandler` function when a job is added to the queue
// 3. Creates a new worker instance that allows hooking to events in the worker lifecycle
const workerInstance = new Worker('uploaded_files_queue', workerJobHandler, {
    connection: workerConnectionOptions,
});

// The `workerJobHandler` function is the function that will be called
// when a job is added to the queue. It will receive the job as an argument.
// The job will contain the data that was added to the queue when the job
// was created.
async function workerJobHandler(job) {
    console.log(`handling job: [${job.id}]`);
    console.log({ jobName: job.name, jobId: job.id, data: job.data });

    // for example:
    // await processUploadedFile(job.data.fileId)
    return;
}

对工人生命周期中的事件作用

我们可以将某些事件发生时挂接到工作人员生命周期中的事件以执行动作。例如,我们可以挂接在completed事件中以完成作业完成后执行操作。当作业失败时,我们还可以连接到failed事件以执行动作。

将以下内容添加到worker.js文件:

workerInstance.on("completed", async (job) => {
    console.log(`[${job.id}] entering job completion stage!`);
    console.log(`[${job.id}] has completed!`);
});

workerInstance.on("failed", (job, err) => {
    console.error(`[${job.id}] has failed with ${err.message}`);
    console.error(err);
});

workerInstance.on("error", (err) => {
    console.error(`WorkerInstance has errored with ${err.message}`);
});

提示:避免尝试/捕捉错误处理。您无需自定义Worker处理程序功能中的错误处理。 BullMQ自动从功能中捕获错误。接下来,它会恢复失败的作业,并在经过一定数量的重试后将其移至死字队列。

利用CPU核心用BullMQ

node.js,是单线程运行时,只能使用单个CPU核心。这意味着,如果您将分配给工作流程的多核CPU主机,那么您将不断限制硬件资源。相反,您可以通过运行工人的多个实例来利用所有CPU内核。这通常称为群集,它是node.js应用程序中的常见模式。

我们可以使用throng npm package的Node.js内置聚类功能来使用小型包装器,以启动每个CPU的单独node.js运行时过程。

我们将更改worker.js文件以伴随throng

import throng from "throng";

throng({ worker });

// wrap the worker code in a function called `worker` which will then be called by `throng`
// function worker() {
// ... the worker code from previous section
// }

依赖注入的更好的BullMQ工人

您的背景工作人员虽然与主要的Web API分开部署,但仍需要访问您的应用程序的服务和依赖项。例如,您可能需要访问您的数据库或文件存储服务,您需要传递必要的配置,凭据和可能的其他域逻辑。

为了做到这一点,我们可以使用依赖注入容器将依赖性注入工人。我们可以使用Awilix npm package做到这一点。

我建议的模式如下:

  • 该工人与node.js api服务/微服务源代码合作。这样,它可以通过高级抽象API轻松访问域逻辑( services ,即:FileStorageServiceDatabaseService等)。

  • 工人通过依赖项注入层实例化,该层允许工人请求访问任何必要的依赖项,就像是主要服务中的另一个HTTP API路由处理程序一样。

    <

    < /li>

这最终将看起来像这样:

import throng from "throng";
import { Config } from "./services/core/Config.js";
import { DatabaseManager } from "./services/core/db.js";
import { initDI } from "./infra/di.js";
import { WorkerFactory } from "./workers/fileUploadWorker.js";

async function initDatabase(config) {
  // Initialize the database
  const database = new DatabaseManager(config);

  // Ensure database connection is ready
  await database.ping();

  // Return the database instance
  return database;
}

// Load configuration
const configManager = new Config();
const config = await configManager.load();

// Initialize the database
const database = await initDatabase(config);

// Initialize DI
const diContainer = await initDI({ config, database });
const Logger = diContainer.resolve("Logger");
Logger.info(`Worker initialized`);

const worker = WorkerFactory({ container: diContainer });

throng({ worker });

在上面的代码示例中,我们初始化配置和数据库连接详细信息,然后将它们传递给依赖项注入层,以使其可用于DI消费者。当您注意到时,我们将工作代码包装在A 工厂功能中,以使依赖项注入容器可用于工人。

BullMQ生产者客户

为了使本指南完成,我们还将在不久的将来查看该指南的客户,该客户端负责将工作添加到队列中以供工人食用。

实际上很简单:

import { Queue } from "bullmq";

// Create a new queue instance
const queue = new Queue('uploaded_files_queue', {
    connection: workerConnectionOptions,
});

// Schedule a new job on the queue with:
// 1. a name that is associated with this job
// 2. any metadata this job should include (a JSON object)
queue.add(jobName, jobData);

提示:BullMQ将默认设置为0的Retry attempts值安排作业,这意味着它不会重试失败的作业。您可以通过将attempts值设置为数字大于1的数字来覆盖此问题,除非指定延迟,否则要立即重新进行工作。您可以通过将backoff值设置为具有exponential类型的对象和以毫秒为单位指定的延迟来覆盖此目标。

为了适应工作恢复和其他队列房屋保存配置,我们可以按照我们安排工作时进一步自定义工作队列配置:


// This configuration can be provided at either the
// queue level or the job level. In this example
// it is set at the job level.
const jobQueueConfig = {
    attempts: 2,
    backoff: {
    type: "exponential",
    delay: 30000,
    },
    removeOnComplete: {
    age: 24 * 3600,
    },
    removeOnFail: {
    age: 24 * 3600,
    },
};

queue.add(jobName, jobData, jobQueueConfig);

下一步

不要忘记您必须将工人实际上部署到Heroku。您可以通过运行以下命令来执行此操作:

git push heroku main

如果您有兴趣了解有关BullMQ和Redis的更多信息,我建议您查看以下资源:

您还想研究使用RabbitMQ和amqplib NPM软件包的队列的替代实现。