通过WebSocket监视Nestjs中HTTP请求的进度。
#javascript #编程 #nx #nestjs

几天前,我在监视HTTP请求的进度方面遇到了一个问题。根据旧开发人员的传统,首先,我向Google询问了它。我预计会得到一堆不同的答案并选择合适的答案。这次我的直觉使我失望了。即使我得到了许多类似的解决方案,但我也没有找到适当的示例。值得澄清的是,我正在从事一个基于Nestjs的项目。让我解释一下为什么我决定从头开始创建我的解决方案,以及为什么在我的情况下需要修改有关该主题的大多数解决方案。

首先,我想尽可能分享article that describes a bunch of the results above。让我对文章提供基本想法。

  1. 文章描述了提供内容下载的请求。在这种情况下,我们正在谈论宝贵的内容大小。
  2. Content-Length HTTP标头对于正确的HTTP响应至关重要。
  3. 在服务器应用程序设置Content-Length,块数据编写过程之后,应运行。
  4. 首先,客户端应用程序获取Content-Length.
  5. 之后,它获取每个数据块,并将进度计算为以下内容。
progress = 100 * (chunkSize / contentLength)

如果我们谈论内容下载,则上述方法是有益的。尽管由于以下原因而对我不起作用。

  1. 我的任务是关于下载内容以外的其他内容。此外,我们需要具有一个允许我们根据计算计算进度的功能,不仅是根据数据传输。
  2. 尽管应用程序不知道内容大小,但它具有迭代的总数。
  3. 在这种情况下,基于块的方法不起作用。最终结果准备工作将需要很长时间,并且应同时写入响应数据。这就是为什么我们需要在发送答复之前通知客户的原因。

换句话说,新方法的要求是以下。

  1. 响应写作同时进行而没有任何数据块。
  2. 应该在此之前提供进度。

我不想浪费您的时间,并给出有关计算上述要求的方法的几个概念点。

  • 由于持续的连接和高性能。
  • ,通过Websocket提供进度。
  • 将Websocket与当前会话连接到HTTP请求处理过程中的所有必需数据。

下面的所有思想和代码都将与这些要点有很强的联系。但是以前,让我分享最终解决方案:https://github.com/buchslava/nest-request-progress

数据提供示例

我提供了数据处理的简化版本,因为我想关注此任务。在下面的示例中,我们有150次迭代。结果是150个随机数的数组,每个数字以100-1000毫秒计算。我发现这个示例是客观过程的最小可行模型。

import { Injectable } from '@nestjs/common';

const getRandomArbitrary = (min: number, max: number): number =>
  Math.random() * (max - min) + min;
const delay = (time: number) =>
  new Promise((resolve) => setTimeout(resolve, time));

@Injectable()
export class AppService {

  getIterationCount(): number {
    return 150;
  }

  async getData(token: string): Promise<string[]> {
    return new Promise(async (resolve, reject) => {
      try {
        const result = [];

        for (let i = 0; i < this.getIterationCount(); i++) {
          result.push(getRandomArbitrary(1, 9999));
          await delay(getRandomArbitrary(100, 1000));
        }

        resolve(result);
      } catch (e) {
        reject(e);
      }
    });
  }
}

进度经理

未来的步骤是关于ProgressManager实施的。

ProgressManager应该是能够执行以下操作的单独的Nestjs服务。

  1. 启动“进度”会话(不是HTTP会话),并从客户端应用程序中获取唯一的令牌。
  2. 停止“进度”会话
  3. 增加进度的价值。

请查看以下评论​​代码。

import { Injectable } from '@nestjs/common';
import { Server } from 'socket.io';

export interface ProgressSession {
  token: string;
  total: number;
  counter: number;
  timerId: any;
}

@Injectable()
export class ProgressManager {
  // The Socket Server injection will be described later
  public server: Server;
  // This map contains all Progress session data
  private storage: Map<string, ProgressSession> = new Map();

  // Start the session with the token and the total number of iterations
  startSession(token: string, total: number, delay = 2000) {
    // Get current session from the storage
    const currentSession = this.storage.get(token);
    // Do nothing if it's already exist
    if (currentSession) {
      return;
    }
    // Send the progress every "delay" milliseconds
    const timerId = setInterval(async () => {
      const currentSession: ProgressSession = this.storage.get(token);
      // Protect the functionality: if the current session is missing then do nothing
      if (!currentSession) {
        return;
      }
      // Calculate the progress
      let progress = Math.ceil(
        (currentSession.counter / currentSession.total) * 100
      );
      // Protect the progress value, it should be less or equal 100
      if (progress > 100) {
        progress = 100;
      }
      // Send the progress. Pay attention that the event name should contain the "token"
      // Client will use this token also
      this.server.emit(`progress-${token}`, progress);
    }, delay);
    // Initial Progress Session settings. Token is a key.
    this.storage.set(token, {
      token,
      total,
      counter: 0,
      timerId,
    });
  }

  // This method increases the progress
  step(token: string, value = 1) {
    // Get the current session
    const currentSession: ProgressSession = this.storage.get(token);
    // Do nothing if it doesn't exist
    if (!currentSession) {
      return;
    }
    // Increase the counter
    const counter = currentSession.counter + value;
    // Update the storage
    this.storage.set(token, {
      ...currentSession,
      counter,
    });
  }

  // Stop the session by the token
  stopSession(token: string) {
    // Get the current session
    const currentSession: ProgressSession = this.storage.get(token);
    // Do nothing if it doesn't exist
    if (currentSession) {
      // Stop the current timer
      clearInterval(currentSession.timerId);
      // Remove information regarding the current session from the storage
      this.storage.delete(token);
    }
  }
}

您可以在here中找到代码。

WebSockets服务器

另一个重要的是将Nestjs与WebSocket集成并将其连接到Progress Manager。以下代码为此负责。

import {
  WebSocketGateway,
  WebSocketServer,
  OnGatewayInit,
} from '@nestjs/websockets';
import { Server } from 'socket.io';
import { ProgressManager } from './progress-manager';

@WebSocketGateway({ cors: true })
export class AppGateway implements OnGatewayInit {
  constructor(private progressManager: ProgressManager) {}

  @WebSocketServer() server: Server;

  afterInit() {
    // After the WebSockets Gateway has to init, then pass it to the ProgressManager
    this.progressManager.server = this.server;
  }
}

The source >>

,当然,根据Nestjs的要求,我们需要告诉相关模块。

import { Module } from '@nestjs/common';

import { AppController } from './app.controller';
import { AppService } from './app.service';
import { AppGateway } from './app.gateway';
import { ProgressManager } from './progress-manager';

@Module({
  imports: [],
  controllers: [AppController],
  providers: [AppService, AppGateway, ProgressManager],
})
export class AppModule {}

The source>>

数据处理

是时候专注于端点的控制器了。看起来很简单。

import { Controller, Get, Query } from '@nestjs/common';
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @Get()
  getData(@Query() query: { token: string }) {
    return this.appService.getData(query.token);
  }
}

The source >>

和有关服务器的最后一件事是关于提供示例修改的数据。以下示例接近本文中的第一个示例。主要目的是在此处添加“进度功能”。请阅读代码中的评论。这很重要。

import { Injectable } from '@nestjs/common';
import { ProgressManager } from './progress-manager';

const getRandomArbitrary = (min: number, max: number): number =>
  Math.random() * (max - min) + min;
const delay = (time: number) =>
  new Promise((resolve) => setTimeout(resolve, time));

@Injectable()
export class AppService {
  // Use progressManager
  constructor(private readonly progressManager: ProgressManager) {}

  // 150 iterations should be processed
  getIterationCount(): number {
    return 150;
  }

  async getData(token: string): Promise<string[]> {
    return new Promise(async (resolve, reject) => {
      // We need to start the Progress Session before data preparation
      this.progressManager.startSession(token, this.getIterationCount());
      try {
        // Initialize the array of results
        const result = [];

        for (let i = 0; i < this.getIterationCount(); i++) {
          // Calculate the result        
          result.push(getRandomArbitrary(1, 9999));
          // Increase the Progress counter
          this.progressManager.step(token);
          // Random delay
          await delay(getRandomArbitrary(100, 1000));
        }

        // Return the result
        resolve(result);
      } catch (e) {
        reject(e);
      } finally {
        // We need to stop the ProgressManager in any case.
        // Otherwise, we have a redundant timeout.
        this.progressManager.stopSession(token);
      }
    });
  }
}

The source >>

我示例的后端部分已经准备好了。您可以找到完整的后端解决方案here

客户端

我的示例的客户部分放置在here。两个部分都放在一个monorepo中。感谢Nx。让我们看一下。请阅读以下代码中的评论。

import * as io from 'socket.io-client';
import { v4 } from 'uuid';
import axios from 'axios';

// Generate a unique ID (token)
const token = v4();

console.info(new Date().toISOString(), `start the request`);

// Call the endpoint described above
axios
  .get(`http://localhost:3333/api?token=${token}`)
  .then((resp) => {
    // Print the total length of requested data (an array of random numbers)
    console.info(new Date().toISOString(), `got ${resp.data.length} records`);
    process.exit(0);
  })
  .catch((e) => {
    console.info(e);
    process.exit(0);
  });
// We need to connect to the related Socket Server
const ioClient = io.connect('ws://localhost:3333');
// And wait for `progress-${token}` event
ioClient.on(`progress-${token}`, (progress) =>
  console.info(new Date().toISOString(), `processed ${progress}%`)
);

最后一步

是时候尝试解决方案了。

git clone git@github.com:buchslava/nest-request-progress.git
cd nest-request-progress
npm i
npx nx run server:serve

打开另一个终端并运行:

npx nx run client:serve

Image description