几天前,我在监视HTTP请求的进度方面遇到了一个问题。根据旧开发人员的传统,首先,我向Google询问了它。我预计会得到一堆不同的答案并选择合适的答案。这次我的直觉使我失望了。即使我得到了许多类似的解决方案,但我也没有找到适当的示例。值得澄清的是,我正在从事一个基于Nestjs的项目。让我解释一下为什么我决定从头开始创建我的解决方案,以及为什么在我的情况下需要修改有关该主题的大多数解决方案。
首先,我想尽可能分享article that describes a bunch of the results above。让我对文章提供基本想法。
- 文章描述了提供内容下载的请求。在这种情况下,我们正在谈论宝贵的内容大小。
-
Content-Length
HTTP标头对于正确的HTTP响应至关重要。 - 在服务器应用程序设置
Content-Length,
块数据编写过程之后,应运行。 - 首先,客户端应用程序获取
Content-Length.
- 之后,它获取每个数据块,并将进度计算为以下内容。
progress = 100 * (chunkSize / contentLength)
如果我们谈论内容下载,则上述方法是有益的。尽管由于以下原因而对我不起作用。
- 我的任务是关于下载内容以外的其他内容。此外,我们需要具有一个允许我们根据计算计算进度的功能,不仅是根据数据传输。 。
- 尽管应用程序不知道内容大小,但它具有迭代的总数。
- 在这种情况下,基于块的方法不起作用。最终结果准备工作将需要很长时间,并且应同时写入响应数据。这就是为什么我们需要在发送答复之前通知客户的原因。
换句话说,新方法的要求是以下。
- 响应写作同时进行而没有任何数据块。
- 应该在此之前提供进度。
我不想浪费您的时间,并给出有关计算上述要求的方法的几个概念点。
- 由于持续的连接和高性能。 ,通过Websocket提供进度。
- 将Websocket与当前会话连接到HTTP请求处理过程中的所有必需数据。
下面的所有思想和代码都将与这些要点有很强的联系。但是以前,让我分享最终解决方案:https://github.com/buchslava/nest-request-progress
数据提供示例
我提供了数据处理的简化版本,因为我想关注此任务。在下面的示例中,我们有150次迭代。结果是150个随机数的数组,每个数字以100-1000毫秒计算。我发现这个示例是客观过程的最小可行模型。
import { Injectable } from '@nestjs/common';
const getRandomArbitrary = (min: number, max: number): number =>
Math.random() * (max - min) + min;
const delay = (time: number) =>
new Promise((resolve) => setTimeout(resolve, time));
@Injectable()
export class AppService {
getIterationCount(): number {
return 150;
}
async getData(token: string): Promise<string[]> {
return new Promise(async (resolve, reject) => {
try {
const result = [];
for (let i = 0; i < this.getIterationCount(); i++) {
result.push(getRandomArbitrary(1, 9999));
await delay(getRandomArbitrary(100, 1000));
}
resolve(result);
} catch (e) {
reject(e);
}
});
}
}
进度经理
未来的步骤是关于ProgressManager
实施的。
ProgressManager
应该是能够执行以下操作的单独的Nestjs服务。
- 启动“进度”会话(不是HTTP会话),并从客户端应用程序中获取唯一的令牌。
- 停止“进度”会话
- 增加进度的价值。
请查看以下评论代码。
import { Injectable } from '@nestjs/common';
import { Server } from 'socket.io';
export interface ProgressSession {
token: string;
total: number;
counter: number;
timerId: any;
}
@Injectable()
export class ProgressManager {
// The Socket Server injection will be described later
public server: Server;
// This map contains all Progress session data
private storage: Map<string, ProgressSession> = new Map();
// Start the session with the token and the total number of iterations
startSession(token: string, total: number, delay = 2000) {
// Get current session from the storage
const currentSession = this.storage.get(token);
// Do nothing if it's already exist
if (currentSession) {
return;
}
// Send the progress every "delay" milliseconds
const timerId = setInterval(async () => {
const currentSession: ProgressSession = this.storage.get(token);
// Protect the functionality: if the current session is missing then do nothing
if (!currentSession) {
return;
}
// Calculate the progress
let progress = Math.ceil(
(currentSession.counter / currentSession.total) * 100
);
// Protect the progress value, it should be less or equal 100
if (progress > 100) {
progress = 100;
}
// Send the progress. Pay attention that the event name should contain the "token"
// Client will use this token also
this.server.emit(`progress-${token}`, progress);
}, delay);
// Initial Progress Session settings. Token is a key.
this.storage.set(token, {
token,
total,
counter: 0,
timerId,
});
}
// This method increases the progress
step(token: string, value = 1) {
// Get the current session
const currentSession: ProgressSession = this.storage.get(token);
// Do nothing if it doesn't exist
if (!currentSession) {
return;
}
// Increase the counter
const counter = currentSession.counter + value;
// Update the storage
this.storage.set(token, {
...currentSession,
counter,
});
}
// Stop the session by the token
stopSession(token: string) {
// Get the current session
const currentSession: ProgressSession = this.storage.get(token);
// Do nothing if it doesn't exist
if (currentSession) {
// Stop the current timer
clearInterval(currentSession.timerId);
// Remove information regarding the current session from the storage
this.storage.delete(token);
}
}
}
您可以在here中找到代码。
WebSockets服务器
另一个重要的是将Nestjs与WebSocket集成并将其连接到Progress Manager。以下代码为此负责。
import {
WebSocketGateway,
WebSocketServer,
OnGatewayInit,
} from '@nestjs/websockets';
import { Server } from 'socket.io';
import { ProgressManager } from './progress-manager';
@WebSocketGateway({ cors: true })
export class AppGateway implements OnGatewayInit {
constructor(private progressManager: ProgressManager) {}
@WebSocketServer() server: Server;
afterInit() {
// After the WebSockets Gateway has to init, then pass it to the ProgressManager
this.progressManager.server = this.server;
}
}
,当然,根据Nestjs的要求,我们需要告诉相关模块。
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { AppGateway } from './app.gateway';
import { ProgressManager } from './progress-manager';
@Module({
imports: [],
controllers: [AppController],
providers: [AppService, AppGateway, ProgressManager],
})
export class AppModule {}
数据处理
是时候专注于端点的控制器了。看起来很简单。
import { Controller, Get, Query } from '@nestjs/common';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@Get()
getData(@Query() query: { token: string }) {
return this.appService.getData(query.token);
}
}
和有关服务器的最后一件事是关于提供示例修改的数据。以下示例接近本文中的第一个示例。主要目的是在此处添加“进度功能”。请阅读代码中的评论。这很重要。
import { Injectable } from '@nestjs/common';
import { ProgressManager } from './progress-manager';
const getRandomArbitrary = (min: number, max: number): number =>
Math.random() * (max - min) + min;
const delay = (time: number) =>
new Promise((resolve) => setTimeout(resolve, time));
@Injectable()
export class AppService {
// Use progressManager
constructor(private readonly progressManager: ProgressManager) {}
// 150 iterations should be processed
getIterationCount(): number {
return 150;
}
async getData(token: string): Promise<string[]> {
return new Promise(async (resolve, reject) => {
// We need to start the Progress Session before data preparation
this.progressManager.startSession(token, this.getIterationCount());
try {
// Initialize the array of results
const result = [];
for (let i = 0; i < this.getIterationCount(); i++) {
// Calculate the result
result.push(getRandomArbitrary(1, 9999));
// Increase the Progress counter
this.progressManager.step(token);
// Random delay
await delay(getRandomArbitrary(100, 1000));
}
// Return the result
resolve(result);
} catch (e) {
reject(e);
} finally {
// We need to stop the ProgressManager in any case.
// Otherwise, we have a redundant timeout.
this.progressManager.stopSession(token);
}
});
}
}
我示例的后端部分已经准备好了。您可以找到完整的后端解决方案here。
客户端
我的示例的客户部分放置在here。两个部分都放在一个monorepo中。感谢Nx。让我们看一下。请阅读以下代码中的评论。
import * as io from 'socket.io-client';
import { v4 } from 'uuid';
import axios from 'axios';
// Generate a unique ID (token)
const token = v4();
console.info(new Date().toISOString(), `start the request`);
// Call the endpoint described above
axios
.get(`http://localhost:3333/api?token=${token}`)
.then((resp) => {
// Print the total length of requested data (an array of random numbers)
console.info(new Date().toISOString(), `got ${resp.data.length} records`);
process.exit(0);
})
.catch((e) => {
console.info(e);
process.exit(0);
});
// We need to connect to the related Socket Server
const ioClient = io.connect('ws://localhost:3333');
// And wait for `progress-${token}` event
ioClient.on(`progress-${token}`, (progress) =>
console.info(new Date().toISOString(), `processed ${progress}%`)
);
最后一步
是时候尝试解决方案了。
git clone git@github.com:buchslava/nest-request-progress.git
cd nest-request-progress
npm i
npx nx run server:serve
打开另一个终端并运行:
npx nx run client:serve