视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001 知道1 知道21 知道41 知道61 知道81 知道101 知道121 知道141 知道161 知道181 知道201 知道221 知道241 知道261 知道281
问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501
Node.js pipe实现源码解析
2020-11-27 22:32:55 责编:小采
文档

从前面两篇文章,我们了解到。想要把 Readable 的数据写到 Writable,就必须先手动的将数据读入内存,然后写入 Writable。换句话说,每次传递数据时,都需要写如下的模板代码

readable.on('readable', (err) => {
 if(err) throw err

 writable.write(readable.read())
})

为了方便使用,Node.js 提供了 pipe() 方法,让我们可以优雅的传递数据

readable.pipe(writable)

现在,就让我们来看看它是如何实现的吧

pipe

首先需要先调用 Readable 的 pipe() 方法

// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
 var src = this;
 var state = this._readableState;

 // 记录 Writable
 switch (state.pipesCount) {
 case 0:
 state.pipes = dest;
 break;
 case 1:
 state.pipes = [state.pipes, dest];
 break;
 default:
 state.pipes.push(dest);
 break;
 }
 state.pipesCount += 1;

 // ...

 src.once('end', endFn);

 dest.on('unpipe', onunpipe);
 
 // ...

 dest.on('drain', ondrain);

 // ...

 src.on('data', ondata);

 // ...

 // 保证 error 事件触发时,onerror 首先被执行
 prependListener(dest, 'error', onerror);

 // ...

 dest.once('close', onclose);
 
 // ...

 dest.once('finish', onfinish);

 // ...

 // 触发 Writable 的 pipe 事件
 dest.emit('pipe', src);

 // 将 Readable 改为 flow 模式
 if (!state.flowing) {
 debug('pipe resume');
 src.resume();
 }

 return dest;
};

执行 pipe() 函数时,首先将 Writable 记录到 state.pipes 中,然后绑定相关事件,最后如果 Readable 不是 flow 模式,就调用 resume() 将 Readable 改为 flow 模式

传递数据

Readable 从数据源获取到数据后,触发 data 事件,执行 ondata()

ondata() 相关代码:

// lib/_stream_readable.js

 // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况
 // 详情见 https://github.com/nodejs/node/issues/7278
 var increasedAwaitDrain = false;
 function ondata(chunk) {
 debug('ondata');
 increasedAwaitDrain = false;
 var ret = dest.write(chunk);
 if (false === ret && !increasedAwaitDrain) {
 // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况
 if (((state.pipesCount === 1 && state.pipes === dest) ||
 (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
 ) && 
 !cleanedUp) {
 debug('false write response, pause', src._readableState.awaitDrain);
 src._readableState.awaitDrain++;
 increasedAwaitDrain = true;
 }
 // 进入 pause 模式
 src.pause();
 }
 }

在 ondata(chunk) 函数内,通过 dest.write(chunk) 将数据写入 Writable

此时,在 _write() 内部可能会调用 src.push(chunk) 或使其 unpipe,这会导致 awaitDrain 多次增加,不能清零,Readable 卡住

当不能再向 Writable 写入数据时,Readable 会进入 pause 模式,直到所有的 drain 事件触发

触发 drain 事件,执行 ondrain()

// lib/_stream_readable.js

 var ondrain = pipeOnDrain(src);

 function pipeOnDrain(src) {
 return function() {
 var state = src._readableState;
 debug('pipeOnDrain', state.awaitDrain);
 if (state.awaitDrain)
 state.awaitDrain--;
 // awaitDrain === 0,且有 data 
 if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
 state.flowing = true;
 flow(src);
 }
 };
 }

每个 drain 事件触发时,都会减少 awaitDrain,直到 awaitDrain 为 0。此时,调用 flow(src),使 Readable 进入 flow 模式

到这里,整个数据传递循环已经建立,数据会顺着循环源源不断的流入 Writable,直到所有数据写入完成

unpipe

不管写入过程中是否出现错误,最后都会执行 unpipe()

// lib/_stream_readable.js

// ...

 function unpipe() {
 debug('unpipe');
 src.unpipe(dest);
 }

// ...

Readable.prototype.unpipe = function(dest) {
 var state = this._readableState;
 var unpipeInfo = { hasUnpiped: false };

 // 啥也没有
 if (state.pipesCount === 0)
 return this;

 // 只有一个
 if (state.pipesCount === 1) {
 if (dest && dest !== state.pipes)
 return this;
 // 没有指定就 unpipe 所有
 if (!dest)
 dest = state.pipes;

 state.pipes = null;
 state.pipesCount = 0;
 state.flowing = false;
 if (dest)
 dest.emit('unpipe', this, unpipeInfo);
 return this;
 }

 // 没有指定就 unpipe 所有
 if (!dest) {
 var dests = state.pipes;
 var len = state.pipesCount;
 state.pipes = null;
 state.pipesCount = 0;
 state.flowing = false;

 for (var i = 0; i < len; i++)
 dests[i].emit('unpipe', this, unpipeInfo);
 return this;
 }

 // 找到指定 Writable,并 unpipe
 var index = state.pipes.indexOf(dest);
 if (index === -1)
 return this;

 state.pipes.splice(index, 1);
 state.pipesCount -= 1;
 if (state.pipesCount === 1)
 state.pipes = state.pipes[0];

 dest.emit('unpipe', this, unpipeInfo);

 return this;
};

Readable.prototype.unpipe() 函数会根据 state.pipes 属性和 dest 参数,选择执行策略。最后会触发 dest 的 unpipe 事件

unpipe 事件触发后,调用 onunpipe(),清理相关数据

// lib/_stream_readable.js

 function onunpipe(readable, unpipeInfo) {
 debug('onunpipe');
 if (readable === src) {
 if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
 unpipeInfo.hasUnpiped = true;
 // 清理相关数据
 cleanup();
 }
 }
 }

End

在整个 pipe 的过程中,Readable 是主动方 ( 负责整个 pipe 过程:包括数据传递、unpipe 与异常处理 ),Writable 是被动方 ( 只需要触发 drain 事件 )

总结一下 pipe 的过程:

  • 首先执行 readbable.pipe(writable),将 readable 与 writable 对接上
  • 当 readable 中有数据时,readable.emit('data'),将数据写入 writable
  • 如果 writable.write(chunk) 返回 false,则进入 pause 模式,等待 drain 事件触发
  • drain 事件全部触发后,再次进入 flow 模式,写入数据
  • 不管数据写入完成或发生中断,最后都会调用 unpipe()
  • unpipe() 调用 Readable.prototype.unpipe(),触发 dest 的 unpipe 事件,清理相关数据
  • 参考:

    https://github.com/nodejs/node/blob/master/lib/_stream_readable.js

    https://github.com/nodejs/node/blob/master/lib/_stream_writable.js

    下载本文
    显示全文
    专题