
介绍
Node.js Streams具有很强大的优势:您可以使用异步的方式处理输入和输出,您可以独立的步骤转换数据。在本教程中,我将引导您完成理论,并教您如何使用对象流变换,就像Gulp一样。
什么是流?
流是数据集合,就像数组或字符串一样。 不同之处在于流可能并非全部可用,并且它们不必适应内存。 这使得当处理大量数据时,流非常强大,或者一次来自外部源的数据。
然而,流不仅仅是关于使用大数据。 他们也给我们在我们的代码中组合的力量。 就像我们可以通过管道其他较小的Linux命令组成强大的linux命令一样,我们可以在Node中使用流完全一样。

|
|
Node中的许多内置模块实现流式接口:
上面的列表有一些本机Node.js对象的示例,这些对象也是可读写的流。 这些对象中的一些是可读写的流,如TCP套接字,zlib和crypto流。
注意对象也是密切相关的。 虽然HTTP响应是客户端上的可读流,但它是服务器上的可写入流。 这是因为在HTTP情况下,我们基本上从一个对象(http.IncomingMessage)读取并写入另一个对象(http.ServerResponse)。
还要注意,当涉及到子进程时,stdio stream(stdin,stdout,stderr)如何具有逆流类型。 这允许一种非常简单的方式来管理来自主进程stdio流的这些流。
您的第一流应用程序
我们来仔细看看流。 为此,我们将构建一个简单的文件上传应用程序。 首先,我们需要构建一个使用可读流读取文件并将数据管理到特定目标的客户端。 在管道的另一端,我们将实现一个使用可写入流保存上传数据的服务器。
我们从客户端开始吧。 我们从导入HTTP和文件系统模块开始。
客户端
|
|
然后,我们定义我们的HTTP请求。
|
|
现在我们有了我们的请求,我们创建一个读取文件并将内容管理到请求对象的可读流。
|
|
一旦流完成读取所有数据,我们关闭与服务器的连接,调用我们的请求的end()方法。
|
|
服务端
就像我们为客户端所做的那样,我们从导入Node.js模块开始。然后,我们创建一个新的可写流,将数据保存到文本文件中。
|
|
为了让我们的客户端应用上传文件,我们必须创建一个新的Web服务器对象。当数据来自请求对象时,服务器调用我们的流并将缓冲区刷新到输出文件。
|
|
请注意,createServer()返回的req和res对象分别是可读流和可写入流。我们可以监听数据事件,并且一旦处理结束,就将结果回送给客户端。
Node.js中的Stream
Node.js中有四种基本流类型:Readable(可读),Writable(可写),Duplex(双工)和Transform(转换)。
Readable可读流是可以从其中消耗数据的源的抽象。一个例子是fs.createReadStream方法。Writable可写流是可以写入数据的目的地的抽象。一个例子是fs.createWriteStream方法。Duplex双工流既可读又可写。一个例子是TCP套接字。Transform变换流基本上是一个双工流,可用于在写入和读取数据时修改或转换数据。一个例子是使用gzip压缩数据的zlib.createGzip流。您可以将变换流视为函数,其中输入是可写入流部分,输出是可读流部分。您也可能会听到称为“通过流”的转换流。
所有流都是EventEmitter的实例。它们发出可用于读取和写入数据的事件。但是,我们可以使用管道方法以更简单的方式使用流数据。
pipe(管道) 方法
这是您需要记住的魔术线:
|
|
在这个简单的一行中,我们将可读流的输出(数据源)作为可写流的输入 - 目标管道。源必须是可读流,目的地必须是可写的。当然,它们也可以是双工/转换流。实际上,如果我们正在进行双工流,我们可以像在Linux中一样链接管道呼叫:
|
|
pipe方法返回目标流,这使我们能够在上面进行链接。对于流(可读),b和c(双工)和d(可写),我们可以:
|
|
pipe方法是消耗流的最简单方法。通常建议使用管道方法或使用事件消耗流,但不要混合这两个。通常当您使用管道方法时,您不需要使用事件,但是如果您需要以更自定义的方式使用流,那么事件就是要走的路。
流事件(Stream events)
除了从可读流源读取和写入可写目的地之外,管道方法还会自动管理一些事情。例如,它处理错误,文件结尾以及一个流比另一个流更慢或更快的情况。
但是,流也可以直接与事件一起使用。以下是管道方法主要用于读取和写入数据的简化事件等效代码:
|
|
以下是可读写可用流的重要事件和功能的列表:

事件和功能在某种程度上是相关的,因为它们通常被一起使用。
Readable可读流最重要的事件是:
data事件,当流将一大块数据传递给消费者时,该事件被发出。end事件,当没有更多数据要从流中消耗时发出。
Writable可写的流中最重要的事件是:
drain事件是可写入流可以接收更多数据的信号。finish事件,当所有数据已刷新到底层系统时都会发出。
事件和功能可以组合起来,以便定制和优化流的使用。要使用可读流,我们可以使用pipe/unpipe方法,或者read/unshift/resume方法。要使用可写入流,我们可以使用pipe/unpipe方法,或者只是使用write方法写入它,并在完成后调用end方法。
可读流的暂停和流动模式
可读流具有两种主要模式,影响我们消费的方式:
- 它们可以处于暂停模式
- 或者在流动模式
这些模式有时被称为拉和推模式。
默认情况下,所有可读流可在暂停模式下启动,但在需要时可以轻松切换到流动状态并返回到暂停状态。有时会自动切换。
当可读流处于暂停模式时,我们可以使用read方法从流中读取流,但是,对于流模式中的可读流,数据持续流动,我们必须收听事件消耗它
在流动模式下,如果没有消费者可以处理它,数据实际上可能会丢失。这就是为什么当我们在流模式下有可读流时,我们需要一个数据事件处理程序。事实上,只需添加一个数据事件处理程序即可将暂停的流转换为流模式,并删除数据事件处理程序将流切换回暂停模式。其中一些是为了与旧的Node Stream接口的向后兼容而完成的。
要手动切换这两种流模式,可以使用resume和pause方法。

当使用管道方法消耗可读流时,我们不必担心管道自动管理这些模式。
流实现
当谈到Node.js中的流时,主要有两个不同的任务:
- 实现流的任务。
- 消费它们的任务。
到目前为止,我们一直在谈论只消耗流。让我们实现一些!
实现可写流(Writable)
要实现可写流,我们需要使用Stream模块中的Writable构造函数。
|
|
我们可以在许多方面实现一个可写的流。例如,我们可以扩展Writable构造函数
|
|
但是,我更喜欢更简单的构造方法。我们只是从Writable构造函数创建一个对象,并传递一些选项。唯一需要的选项是write函数,它暴露要写入的数据块。
|
|
这个写入方法有三个参数。
- 除非我们以不同的方式配置流,否则
chunk(块)通常是缓冲区。 - 在这种情况下需要
encoding(编码参数),但通常我们可以忽略它。 callback回调函数是处理数据块后我们需要调用的函数。这是写信是否成功的信号。要发出失败信号,请使用错误对象调用回调。
在outStream中,我们只是将该块作为一个字符串进行console.log,然后在没有错误的情况下调用回调来表示成功。这是一个非常简单的,可能不是很有用的回声流。它将回传任何收到的东西。
要消耗这个流,我们可以简单地使用process.stdin,这是一个可读的流,所以我们可以将process.stdin管道传入我们的outstream
当我们运行上面的代码时,我们输入到process.stdin的任何东西都将使用outStream console.log行回显。
这不是一个非常有用的实现流,因为它实际上已经被实现和内置。这非常等同于process.stdout。我们可以将stdin管道输入stdout,我们可以用这条单行得到完全相同的回声功能:
|
|
实现可读流(Readable)
要实现可读流,我们需要可读接口并从中构造一个对象:
|
|
有一种实现可读流的简单方法。我们可以直接push(推送)我们希望消费者消费的数据。
|
|
当我们push一个空对象时,这意味着我们想要表示流没有任何更多的数据。
为了消耗这个简单的可读流,我们可以简单地将它导入到可写入流process.stdout中。
当我们运行上面的代码时,我们将从inStream读取所有数据,并将其回显到标准。很简单,也不是很有效率。
我们基本上推送流中的所有数据,然后将其管理到process.stdout。更好的方法是在消费者要求时按需推送数据。我们可以通过在可读流配置中实现read方法来实现:
|
|
当读取方法在可读流上被调用时,该实现可以将部分数据推送到队列。例如,我们可以一次推一个字母,从字符代码65(表示A)开始,并在每次推送时递增:
|
|
当消费者读取可读流时,读取方法将继续触发,我们会推送更多的信件。我们需要在某个地方停止这个循环,这就是为什么当currentCharCode大于90(表示Z)时,if语句推空。
这个代码相当于我们开始使用的更简单的代码,但是现在我们正在按消费者要求的数据推送数据。你应该永远这样做.
实现双工(Duplex)/转换流(Transfer)
使用双工流,我们可以使用相同的对象来实现可读和可写的流。就好像我们从两个接口继承。
这是一个实例双工流,它结合了上面实现的两个可写和可读的例子:
|
|
通过组合这些方法,我们可以使用这个双工流来读取从A到Z的字母,我们也可以用它的回波特征。我们将可读的stdin流管道到这个双工流中以使用回声功能,我们将双工流本身管道写入可写入stdout流,以查看字母A到Z。
重要的是要了解双工流的可读写双方完全独立运行。这只是将两个特征组合成一个对象。
转换流是更有趣的双工流,因为它的输出是从其输入计算的。
对于一个转换流,我们不需要实现读取或者写入方法,我们只需要实现一个转换方法,它将两者结合起来。它具有写入方式的签名,我们也可以使用它来推送数据。
这是一个简单的转换流,它将您将其转换为大写格式后,再次输入任何内容:
|
|
在这个转换流中,我们正在消费与之前的双工流示例一样,我们只实现了一个transform方法。在该方法中,我们将块转换为大写版本,然后将该版本推送为可读部分。
流对象模式(Stream Object Mode)
默认情况下,流期望缓冲区(Buffer)/字符串(String)值。有一个objectMode标志,我们可以设置为让流接受任何JavaScript对象。
这是一个简单的例子来证明这一点。以下变换流的组合使得将逗号分隔值的字符串映射为JavaScript对象的功能。所以“a,b,c,d”成为{a:b,c:d}。
|
|
我们通过commasplitter传递输入字符串(例如“a,b,c,d”),该数组将数组作为其可读取数据([“a”,“b”,“c”,“d”]))。在该流上添加可读ObjectMode标志是必要的,因为我们正在将对象推送到其上,而不是字符串。
然后我们将数组并且将其导入到arrayToObject流中。我们需要一个writableObjectMode标志来使该流接受一个对象。它还将推送一个对象(输入数组映射到一个对象),这就是为什么我们也需要可读ObjectMode标志的原因。最后一个objectToString流接受一个对象,但是推出一个字符串,这就是为什么我们只需要一个writableObjectMode标志的原因。可读部分是普通字符串(字符串对象)。

Node.js内置的转换流
Node有一些非常有用的内置变换流。即,zlib和crypto stream。
这是一个使用zlib.createGzip()流结合fs readable/writable 流创建文件压缩脚本的示例:
|
|
您可以使用此脚本将您传递的任何文件gzip作为参数。我们将该文件的可读流管道转换为zlib内置的转换流,然后转换为新的gzip-压缩文件的可写流。简单。
使用管道的很酷的事情是,如果我们需要,我们可以将它们与事件结合起来。例如,我希望用户在脚本运行时看到一个进度指示符,当脚本完成时,我想要一个“完成”消息。由于管道方法返回目标流,因此我们可以链接注册事件处理程序:
|
|
所以使用管道方法,我们可以轻松地消耗流,但是我们还可以使用需要的事件进一步定制与这些流的交互。
管道方法有什么好处,但是我们可以使用它以一种可读的方式逐个构成我们的程序。例如,我们可以简单地创建一个变换流来报告进度,而不用监听上面的数据事件,并用另一个.pipe()调用替换.on()调用:
|
|
该reportProgress流是一个简单的直通流,但它将进度标准化。注意我如何使用callback()函数中的第二个参数来推送transform()方法中的数据。这相当于推动数据。
结合流的应用是无止境的。例如,如果我们需要在我们gzip之前或之后加密文件,我们需要做的就是按照我们需要的确切顺序来管理另一个转换流。我们可以使用Node的加密模块:
|
|
上面的脚本压缩然后加密传递的文件,只有具有秘密的人可以使用输出的文件。我们无法使用正常的解压缩实用程序解压缩此文件,因为它已加密。
为了能够解压缩上面脚本压缩的任何东西,我们需要以相反的顺序使用相反的加密和zlib流,这很简单:
|
|
假设传递的文件是压缩版本,上面的代码将创建一个从它的读取流,将其传输到crypto createDecipher()流(使用相同的密钥),将其输出管道输入到zlib createGunzip()流中,然后将文件写回到没有扩展部分的文件中。
小结
了解了这些 Stream 的内部机制,对我们后续深入理解上层代码有很大的促进作用,特别希望初学 Node.js 的同学花点时间进来看看。