流(Stream)是 Node.js 中处理流式数据的抽象接口,广泛应用于文件系统、网络通信、数据压缩等场景。流可以读取或写入数据块,而不必一次性将所有数据加载到内存中,因此非常适合处理大文件或实时数据。本章将详细介绍 Node.js 的四种流类型、pipe 方法的使用、背压处理以及如何自定义流。
流是一组有序、有起点和终点的字节数据序列。在 Node.js 中,流是 EventEmitter 的实例,可以触发事件(如 data、end、error)。根据数据流向,流分为四种类型:
fs.createReadStream()。fs.createWriteStream()。zlib.createGzip()。可读流有两种模式:流动模式(flowing)和暂停模式(paused)。默认处于暂停模式,可以通过监听 data 事件切换到流动模式,或使用 read() 方法手动读取。
const fs = require('fs');
// 创建可读流
const readStream = fs.createReadStream('input.txt', 'utf8');
// 流动模式:监听 data 事件
readStream.on('data', (chunk) => {
console.log(`读取到 ${chunk.length} 字节`);
});
readStream.on('end', () => {
console.log('读取完成');
});
readStream.on('error', (err) => {
console.error('读取错误:', err);
});
const readStream = fs.createReadStream('input.txt', 'utf8');
readStream.on('readable', () => {
let chunk;
while ((chunk = readStream.read()) !== null) {
console.log(`读取到: ${chunk.length} 字节`);
}
});
可写流通过 write() 方法写入数据,最后调用 end() 结束写入。
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');
writeStream.write('第一行\n');
writeStream.write('第二行\n');
writeStream.end('最后一行');
writeStream.on('finish', () => {
console.log('写入完成');
});
writeStream.on('error', (err) => {
console.error('写入错误:', err);
});
pipe() 是可读流上的方法,用于将可读流的数据传输到可写流,自动管理数据流速(背压)。这是处理流最简洁的方式。
const fs = require('fs');
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('dest.txt');
readStream.pipe(writeStream);
writeStream.on('finish', () => {
console.log('复制完成');
});
pipe 方法返回目标流,因此可以链式调用,适用于多个转换流:
const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('source.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('source.txt.gz'))
.on('finish', () => console.log('压缩完成'));
当可写流的写入速度慢于可读流的读取速度时,会发生背压。可读流会自动暂停读取,直到可写流缓冲区释放。pipe() 方法自动处理背压,但手动操作时需要关注 write() 的返回值。
const readStream = fs.createReadStream('largefile.txt');
const writeStream = fs.createWriteStream('copy.txt');
readStream.on('data', (chunk) => {
const canWrite = writeStream.write(chunk);
if (!canWrite) {
readStream.pause(); // 暂停读取
writeStream.once('drain', () => readStream.resume()); // 等待 drain 事件
}
});
readStream.on('end', () => writeStream.end());
通过继承相应的流基类,可以创建自定义流。
const { Readable } = require('stream');
class Counter extends Readable {
constructor(max) {
super();
this.max = max;
this.index = 1;
}
_read() {
if (this.index <= this.max) {
this.push(String(this.index++));
} else {
this.push(null); // 结束
}
}
}
const counter = new Counter(5);
counter.on('data', chunk => console.log(chunk.toString()));
const { Writable } = require('stream');
class Logger extends Writable {
_write(chunk, encoding, callback) {
console.log('写入:', chunk.toString());
callback(); // 必须调用
}
}
const logger = new Logger();
logger.write('Hello ');
logger.write('World');
logger.end();
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
const upper = new UpperCaseTransform();
process.stdin.pipe(upper).pipe(process.stdout);
除了 data、end、error、finish、drain,还有一些重要事件:
readable:当流中有数据可读时触发(暂停模式)。pipe/unpipe:当流被 pipe 或取消 pipe 时触发。close:当流或其底层资源关闭时触发。默认情况下,流处理的是 Buffer 或字符串。通过设置 objectMode: true,可以让流处理 JavaScript 对象。
const { Transform } = require('stream');
const objStream = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
this.push({ ...chunk, timestamp: Date.now() });
callback();
}
});
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
const stream = fs.createReadStream('bigfile.zip');
stream.pipe(res);
}).listen(3000);
注意:需处理客户端中断连接的情况,监听 req 的 close 事件停止读取。
流基于事件,必须监听 error 事件。使用 pipe 时,错误不会自动传播,需手动处理。
const readStream = fs.createReadStream('nonexistent.txt');
const writeStream = fs.createWriteStream('output.txt');
readStream.on('error', (err) => {
console.error('读错误:', err);
writeStream.end(); // 确保写流正常关闭
});
readStream.pipe(writeStream);
pipe 方法,它自动处理背压和数据流控制。对于复杂场景,可以组合多个 pipe 调用,并监听目标流的 finish 事件。
流是 Node.js 高性能数据处理的核心。通过本章学习,你应该掌握了可读流、可写流、双工流、转换流的基本用法,理解了 pipe 的强大和背压的处理方式。在接下来的章节中,我们将探讨 Buffer 模块,它与流紧密相关,用于处理二进制数据。