Node.js 流与管道

流(Stream)是 Node.js 中处理流式数据的抽象接口,广泛应用于文件系统、网络通信、数据压缩等场景。流可以读取或写入数据块,而不必一次性将所有数据加载到内存中,因此非常适合处理大文件或实时数据。本章将详细介绍 Node.js 的四种流类型、pipe 方法的使用、背压处理以及如何自定义流。

1. 什么是流?

流是一组有序、有起点和终点的字节数据序列。在 Node.js 中,流是 EventEmitter 的实例,可以触发事件(如 dataenderror)。根据数据流向,流分为四种类型:

  • 可读流(Readable):提供数据的源头,例如 fs.createReadStream()
  • 可写流(Writable):数据写入的目标,例如 fs.createWriteStream()
  • 双工流(Duplex):既可读又可写,例如网络套接字(TCP socket)。
  • 转换流(Transform):在读写过程中可以修改或转换数据的双工流,例如 zlib.createGzip()

2. 可读流

可读流有两种模式:流动模式(flowing)和暂停模式(paused)。默认处于暂停模式,可以通过监听 data 事件切换到流动模式,或使用 read() 方法手动读取。

const fs = require('fs');

// 创建可读流
const readStream = fs.createReadStream('input.txt', 'utf8');

// 流动模式:监听 data 事件
readStream.on('data', (chunk) => {
  console.log(`读取到 ${chunk.length} 字节`);
});

readStream.on('end', () => {
  console.log('读取完成');
});

readStream.on('error', (err) => {
  console.error('读取错误:', err);
});

2.1 手动读取(暂停模式)

const readStream = fs.createReadStream('input.txt', 'utf8');

readStream.on('readable', () => {
  let chunk;
  while ((chunk = readStream.read()) !== null) {
    console.log(`读取到: ${chunk.length} 字节`);
  }
});

3. 可写流

可写流通过 write() 方法写入数据,最后调用 end() 结束写入。

const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');

writeStream.write('第一行\n');
writeStream.write('第二行\n');
writeStream.end('最后一行');

writeStream.on('finish', () => {
  console.log('写入完成');
});

writeStream.on('error', (err) => {
  console.error('写入错误:', err);
});

4. 管道:pipe 方法

pipe() 是可读流上的方法,用于将可读流的数据传输到可写流,自动管理数据流速(背压)。这是处理流最简洁的方式。

const fs = require('fs');
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('dest.txt');

readStream.pipe(writeStream);

writeStream.on('finish', () => {
  console.log('复制完成');
});

pipe 方法返回目标流,因此可以链式调用,适用于多个转换流:

const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('source.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('source.txt.gz'))
  .on('finish', () => console.log('压缩完成'));

5. 背压(Backpressure)

当可写流的写入速度慢于可读流的读取速度时,会发生背压。可读流会自动暂停读取,直到可写流缓冲区释放。pipe() 方法自动处理背压,但手动操作时需要关注 write() 的返回值。

const readStream = fs.createReadStream('largefile.txt');
const writeStream = fs.createWriteStream('copy.txt');

readStream.on('data', (chunk) => {
  const canWrite = writeStream.write(chunk);
  if (!canWrite) {
    readStream.pause(); // 暂停读取
    writeStream.once('drain', () => readStream.resume()); // 等待 drain 事件
  }
});

readStream.on('end', () => writeStream.end());

6. 自定义流

通过继承相应的流基类,可以创建自定义流。

6.1 自定义可读流

const { Readable } = require('stream');

class Counter extends Readable {
  constructor(max) {
    super();
    this.max = max;
    this.index = 1;
  }

  _read() {
    if (this.index <= this.max) {
      this.push(String(this.index++));
    } else {
      this.push(null); // 结束
    }
  }
}

const counter = new Counter(5);
counter.on('data', chunk => console.log(chunk.toString()));

6.2 自定义可写流

const { Writable } = require('stream');

class Logger extends Writable {
  _write(chunk, encoding, callback) {
    console.log('写入:', chunk.toString());
    callback(); // 必须调用
  }
}

const logger = new Logger();
logger.write('Hello ');
logger.write('World');
logger.end();

6.3 自定义转换流

const { Transform } = require('stream');

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

const upper = new UpperCaseTransform();
process.stdin.pipe(upper).pipe(process.stdout);

7. 流的事件

除了 dataenderrorfinishdrain,还有一些重要事件:

  • readable:当流中有数据可读时触发(暂停模式)。
  • pipe/unpipe:当流被 pipe 或取消 pipe 时触发。
  • close:当流或其底层资源关闭时触发。

8. 对象模式

默认情况下,流处理的是 Buffer 或字符串。通过设置 objectMode: true,可以让流处理 JavaScript 对象。

const { Transform } = require('stream');

const objStream = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    this.push({ ...chunk, timestamp: Date.now() });
    callback();
  }
});

9. 实用示例:HTTP 文件下载

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
  const stream = fs.createReadStream('bigfile.zip');
  stream.pipe(res);
}).listen(3000);

注意:需处理客户端中断连接的情况,监听 reqclose 事件停止读取。

10. 错误处理

流基于事件,必须监听 error 事件。使用 pipe 时,错误不会自动传播,需手动处理。

const readStream = fs.createReadStream('nonexistent.txt');
const writeStream = fs.createWriteStream('output.txt');

readStream.on('error', (err) => {
  console.error('读错误:', err);
  writeStream.end(); // 确保写流正常关闭
});
readStream.pipe(writeStream);
最佳实践: 尽可能使用 pipe 方法,它自动处理背压和数据流控制。对于复杂场景,可以组合多个 pipe 调用,并监听目标流的 finish 事件。

小结

流是 Node.js 高性能数据处理的核心。通过本章学习,你应该掌握了可读流、可写流、双工流、转换流的基本用法,理解了 pipe 的强大和背压的处理方式。在接下来的章节中,我们将探讨 Buffer 模块,它与流紧密相关,用于处理二进制数据。