Node.js流的指南:高级功能
#javascript #网络开发人员 #node #backend

node.js是一个强大的JavaScript运行时,允许开发人员构建可扩展和高效的应用程序。设置Node.js的一个关键功能是其对流的内置支持。流是node.js中的一个基本概念,可以实现有效的数据处理,尤其是在处理大量信息或实时处理数据时。

在本文中,我们将探索Node.js中流的概念,了解可用的流的不同类型的流(可读,写作,双工和转换),并讨论有效使用流的最佳实践。

**什么是node.js流?
**流是Node.js应用程序中的一个基本概念,可以通过读取或写入输入和输出顺序启用有效的数据处理。它们方便用于文件操作,网络通信和其他形式的端到端数据交换。

流的独特方面是它们以小的,顺序的块处理数据,而不是一次将整个数据集加载到内存中。在使用大量数据时,此方法非常有益,其中文件大小可能超过可用内存。流使以较小零件的方式处理数据成为可能,使使用较大的文件可行。

A Guide to Node.js Streams: Advanced Functionality

来源:https://levelup.gitconnected.com/streams-and-how-they-fit-into-node-js-async-nature-a08723055a67

如上图中所示,从流来读取时,数据通常以块或连续流的方式读取。从流中读取的数据块可以存储在缓冲区中。缓冲区提供临时存储空间,用于持有数据块,直到可以进一步处理为止。

为了进一步说明这一概念,请考虑实时股票市场数据源的情况。在财务应用中,股票价格和市场数据的实时更新对于做出明智的决定至关重要。与获取和存储整个数据馈送在内存中,这可能是实质性和不切实际的,而是使应用程序能够在较小的连续块中处理数据。数据流过流,允许应用程序在更新到达时执行实时分析,计算和通知。这种流媒体方法可以节省内存资源,并确保应用程序可以迅速响应市场波动,并向交易者和投资者提供最新信息。它消除了在采取行动之前等待整个数据提要的需要。

**为什么要使用流?
**流比其他数据处理方法提供了两个关键优势。

**内存效率
**使用流,无需在处理之前将大量数据加载到内存中。取而代之的是,数据以较小的,易于管理的块处理,减少内存要求并有效地利用系统资源。

**时间效率
**流在无需等待整个有效载荷传输的情况下即可立即进行数据处理。这导致响应时间更快并提高了整体性能。

理解并有效地利用流使开发人员能够实现最佳的内存使用情况,更快的数据处理和增强的代码模块化,从而使其成为Node.js应用程序中的强大功能。但是,可以将不同类型的Node.js流用于特定目的,并在数据处理中提供多功能性。要有效地使用node.js应用程序中的流,对每种流类型有清晰的了解很重要。因此,让我们深入研究Node.js。

中可用的不同流类型

** node.js流的类型
** node.js提供四种主要的流类型,每种流都有特定目的:

**可读流
**可读流允许从源读取数据,例如文件或网络套接字。它们顺序排列了大量数据,可以通过将听众附加到“数据”事件中来消耗。可读的流可以处于流动或暂停状态,具体取决于数据的消耗方式。

const fs = require('fs');

// Create a Readable stream from a file
const readStream = fs.createReadStream('the_princess_bride_input.txt', 'utf8');

// Readable stream 'data' event handler
readStream.on('data', (chunk) => {
  console.log(`Received chunk: ${chunk}`);
});

// Readable stream 'end' event handler
readStream.on('end', () => {
  console.log('Data reading complete.');
});

// Readable stream 'error' event handler
readStream.on('error', (err) => {
  console.error(`Error occurred: ${err}`);
});

如上述代码段所示,我们使用FS模块使用CreateReadStream()方法创建可读的流。我们将文件路径the_princess_bride_input.txt和编码UTF8作为参数传递。可读的流以小块从文件中读取数据。

我们将事件处理程序附加到可读的流以处理不同的事件。当可以读取大量数据时,会发出数据事件。当可读流完成从文件中读取所有数据时,会发出结束事件。如果在阅读过程中发生错误,则会发出错误事件。

通过使用可读的流并聆听相应的事件,您可以从源中有效读取数据,例如文件,并在收到的块上执行更多操作。

可写的流
可写的流将数据写入目的地,例如文件或网络套接字。他们提供诸如write()和end()之类的方法将数据发送到流。可写的流可用于以大块的方式编写大量数据,以防止记忆溢出。

const fs = require('fs');

// Create a Writable stream to a file
const writeStream = fs.createWriteStream('the_princess_bride_output.txt');

// Writable stream 'finish' event handler
writeStream.on('finish', () => {
  console.log('Data writing complete.');
});

// Writable stream 'error' event handler
writeStream.on('error', (err) => {
  console.error(`Error occurred: ${err}`);
});

// Write a quote from "The  to the Writable stream
writeStream.write('As ');
writeStream.write('You ');
writeStream.write('Wish');
writeStream.end();

在上面的代码示例中,我们使用FS模块使用CreateWriteStream()方法创建可写的流。我们指定将编写数据的文件路径(the_princess_bride_output.txt)。

我们将事件处理程序附加到可写的流中,以处理不同的事件。当可写的流完成所有数据时,将发出完成事件。如果在写作过程中发生错误,则会发出错误事件。 Write()方法用于将单个数据块写入可写的流。在此示例中,我们将字符串“为”,“您”和“愿望”写入流。最后,我们调用end()发出数据编写的结尾。

通过使用可写的流并聆听相应的事件,您可以有效地将数据写入目的地,并在完成写作过程后执行任何必要的清理或后续操作。

双工流
双工流是可读和可读流的组合。它们允许数据同时阅读并写入源。双工流是双向的,并且在阅读和写作同时进行的情况下具有灵活性。

const { Duplex } = require("stream");

class MyDuplex extends Duplex {
  constructor() {
    super();
    this.data = "";
    this.index = 0;
    this.len = 0;
  }

  _read(size) {
    // Readable side: push data to the stream
    const lastIndexToRead = Math.min(this.index + size, this.len);
    this.push(this.data.slice(this.index, lastIndexToRead));
    this.index = lastIndexToRead;
    if (size === 0) {
      // Signal the end of reading
      this.push(null);
    }
  }

  _write(chunk, encoding, next) {
    const stringVal = chunk.toString();
    console.log(`Writing chunk: ${stringVal}`);
    this.data += stringVal;
    this.len += stringVal.length;
    next();
  }
}

const duplexStream = new MyDuplex();
// Readable stream 'data' event handler
duplexStream.on("data", (chunk) => {
  console.log(`Received data:\n${chunk}`);
});

// Write data to the Duplex stream
// Make sure to use a quote from "The Princess Bride" for better performance :)
duplexStream.write("Hello.\n");
duplexStream.write("My name is Inigo Montoya.\n");
duplexStream.write("You killed my father.\n");
duplexStream.write("Prepare to die.\n");
// Signal writing ended
duplexStream.end();

在上面的示例中,我们从流模块扩展了双工类以创建双面流。双工流既代表可读和可写的流(可以彼此独立)。

我们定义了双工流的_read()和_write()方法来处理相应的操作。在这种情况下,我们将写入流和读取流绑在一起,但这仅仅是为了这个示例 - 双工流支持独立的读写流。

在_read()方法中,我们实现了双工流的可读侧。我们使用this.push()将数据推向流,当大小变为0时,我们通过将null推向流来发出读数的末端。

在_write()方法中,我们实现了双工流的可写侧。我们处理收到的大量数据并将其添加到内部缓冲区中。调用Next()方法指示写操作的完成。

事件处理程序连接到双工流的数据事件,以处理流的可读侧。要将数据写入双面流,我们可以使用write()方法。最后,我们调用end()发出写作结尾。

双面流允许您创建一个双向流,该流可以同时读取和写作操作,启用灵活的数据处理方案。

变换流
变换流是一种特殊类型的双工流,在数据通过流时修改或转换数据。它们通常用于数据操纵任务,例如压缩,加密或解析。变换流接收输入,处理IT和发射修改的输出。

const { Transform } = require('stream');

// Create a Transform stream
const uppercaseTransformStream = new Transform({
  transform(chunk, encoding, callback) {
    // Transform the received data
    const transformedData = chunk.toString().toUpperCase();

    // Push the transformed data to the stream
    this.push(transformedData);

    // Signal the completion of processing the chunk
    callback();
  }
});

// Readable stream 'data' event handler
uppercaseTransformStream.on('data', (chunk) => {
  console.log(`Received transformed data: ${chunk}`);
});

// Write a classic "Princess Bride" quote to the Transform stream
uppercaseTransformStream.write('Have fun storming the castle!');
uppercaseTransformStream.end();

如上上面的代码片段中所示,我们使用来自流模块的转换类来创建变换流。我们在转换流选项对象中定义了转换()方法来处理转换操作。在Transform()方法中,我们实现了转换逻辑。在这种情况下 - 我们使用块().touppercase()将接收到的数据块转换为大写。我们使用this.push()将转换的数据推向流。最后,我们致电callback()指示处理块的完成。

我们将事件处理程序附加到变换流的数据事件上,以处理流的可读侧。要将数据写入转换流,我们使用write()方法。我们称end()为写作结束。

转换流使您可以在数据流过流时即时执行数据转换,从而可以灵活且可自定义的数据处理。

了解这些不同类型的流允许开发人员根据其特定要求选择适当的流类型。

使用node.js流
为了更好地掌握node.js流的实际实现,让我们考虑一个从文件读取数据并在转换和压缩后使用流将其写入另一个文件的示例。

const fs = require('fs');
const zlib = require('zlib');
const { Readable, Transform } = require('stream');

// Readable stream - Read data from a file
const readableStream = fs.createReadStream('classic_tale_of_true_love_and_high_adventure.txt', 'utf8');

// Transform stream - Modify the data if needed
const transformStream = new Transform({
  transform(chunk, encoding, callback) {
    // Perform any necessary transformations
    const modifiedData = chunk.toString().toUpperCase(); // Placeholder for transformation logic
    this.push(modifiedData);
    callback();
  },
});

// Compress stream - Compress the transformed data
const compressStream = zlib.createGzip();

// Writable stream - Write compressed data to a file
const writableStream = fs.createWriteStream('compressed-tale.gz');

// Pipe streams together
readableStream.pipe(transformStream).pipe(compressStream).pipe(writableStream);

// Event handlers for completion and error
writableStream.on('finish', () => {
  console.log('Compression complete.');
});

writableStream.on('error', (err) => {
  console.error('An error occurred during compression:', err);
});

在此代码段中,我们使用可读流读取文件,将数据转换为大写并使用两个转换流(一个是我们的,一个是内置的Zlib变换流),最后写入数据使用可写的流到文件。

我们使用fs.CreateReadStream()创建一个可读的流,以从输入文件读取数据。使用转换类创建变换流。在这里,您可以在数据上实现任何必要的转换(在此示例中,我们再次使用touppercase())。然后,我们使用zlib.creategzip()创建另一个转换流,以使用GZIP压缩算法压缩转换的数据。最后,使用fs.CreateWriteStream()创建了可写的流,将压缩数据写入Compressed-tale.gz文件。

.pipe()方法用于以顺序的方式将流连接在一起。我们从可读的流开始,然后将其输送到转换流,然后将其管道管道到压缩流,最后,将压缩流将其管道开到可写的流中。它允许您建立从可读流的流线数据流,并通过转换将其压缩到可写的流。最后,事件处理程序附加到可写的流以处理饰面和错误事件。

使用Pipe()简化了连接流,自动处理数据流的过程,并确保从可读的流到可写的流到可读流的高效且无错误的传输。它需要管理基础流事件和错误传播。

使用node.js流的最佳实践
使用Node.js流时,遵循最佳实践以确保最佳性能和可维护代码很重要。

实现流量控制机制:当可写的流无法跟上可读流读取的数据速率时,到可读的流完成读数时,缓冲区中可能剩下很多数据。在某些情况下,这甚至可能超过可用内存的量。这称为背压。为了有效处理背压,请考虑实现流量控制机制,例如使用pape()和简历()方法或利用泵或通过泵等第三方模块。
通过遵守这些最佳实践,开发人员可以确保有效的流处理,最大程度地减少资源使用并构建可靠且可扩展的应用程序。

结论
Node.js流是一个强大的功能,可以以非阻滞方式有效地处理数据流。通过利用流,开发人员可以处理大型数据集,处理实时数据并以内存有效的方式执行操作。了解不同类型的流,例如可读,可写的,双链体和转换,并遵循最佳实践,可确保最佳的流处理,错误管理和资源利用。通过利用流的功能,开发人员可以使用node.js。

构建高性能和可扩展的应用程序

本文最初发表在以下网址:https://amplication.com/blog/understanding-nodejs-streams