博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
node stream
阅读量:5914 次
发布时间:2019-06-19

本文共 11537 字,大约阅读时间需要 38 分钟。

Stream 是一个抽象接口,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout(标准输出)。 Node.js,Stream 有四种流类型:

Readable - 可读操作。

Writable - 可写操作。

Duplex - 可读可写操作.

Transform - 操作被写入数据,然后读出结果。

所有的 Stream 对象都是 EventEmitter 的实例。常用的事件有:

data - 当有数据可读时触发。

end - 没有更多的数据可读时触发。

error - 在接收和写入过程中发生错误时触发。

finish - 所有数据已被写入到底层系统时触发。

let fs = require('fs');let path = require('path');// 返回的是一个可读流对象let rs = fs.createReadStream(path.join(__dirname, '1.txt'), {    flags: 'r', // 文件的操作是读取操作    encoding: 'utf8', // 默认是null null代表的是buffer    autoClose: true, // 读取完毕后自动关闭    highWaterMark: 3, // 默认是64k  64*1024b    start: 0,    end: 3 // 包前又包后});rs.setEncoding('utf8');rs.on('open', function() {    console.log('文件打开了');});rs.on('close', function() {    console.log('关闭');});rs.on('error',function (err) {    console.log(err);});rs.on('data',function(data) { // 暂停模式 -> 流动模式    console.log(data);    rs.pause(); // 暂停方法 表示暂停读取,暂停data事件触发});setInterval(function() {   rs.resume(); //恢复data时间的触发}, 3000);rs.on('end',function() {    console.log('end')});复制代码

实现了stream.Readable接口的对象,将对象数据读取为流数据,当监听data事件后,开始发射数据

let EventEmitter = require('events');let fs = require('fs');class ReadStream extends EventEmitter {    constructor(path, options) {        super();        this.path = path;        this.flags = options.flags || 'r';        this.autoClose = options.autoClose || true;        this.highWaterMark = options.highWaterMark|| 64*1024;        this.start = options.start||0;        this.end = options.end;        this.encoding = options.encoding || null        this.open();//打开文件 fd        this.flowing = null; // null就是暂停模式            this.buffer = Buffer.alloc(this.highWaterMark);        this.pos = this.start; // pos 读取的位置 可变 start不变的        this.on('newListener', (eventName,callback) => {            if (eventName === 'data') {                // 相当于用户监听了data事件                this.flowing  = true;                // 监听了 就去读                this.read(); // 去读内容了            }        })    }        read(){        // 此时文件还没打开呢        if (typeof this.fd !== 'number') {            return this.once('open', () => this.read())        }        let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos+1) : this.highWaterMark;        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => {            if (bytesRead > 0) {                this.pos += bytesRead;                let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead);                this.emit('data', data);                // 当读取的位置 大于了末尾 就是读取完毕了                if(this.pos > this.end){                    this.emit('end');                    this.destroy();                }                if(this.flowing) { // 流动模式继续触发                    this.read();                 }            }else{                this.emit('end');                this.destroy();            }        });    }        resume() {        this.flowing = true;        this.read();    }        pause() {        this.flowing = false;    }        destroy() {        // 先判断有没有fd 有关闭文件 触发close事件        if(typeof this.fd === 'number') {            fs.close(this.fd, () => {                this.emit('close');            });            return;        }        this.emit('close'); // 销毁    };        open() {        // copy 先打开文件        fs.open(this.path, this.flags, (err,fd) => {            if (err) {                this.emit('error', err);                if (this.autoClose) { // 是否自动关闭                    this.destroy();                }                return;            }            this.fd = fd; // 保存文件描述符            this.emit('open'); // 文件打开了        });    }}module.exports = ReadStream;复制代码

流的缓存 Writable 和 Readable 流都会将数据存储到内部的缓冲器(buffer)中。这些缓冲器可以 通过相应的 writable._writableState.getBuffer() 或 readable._readableState.buffer 来获取。

缓冲器的大小取决于传递给流构造函数的 highWaterMark 选项。 对于普通的流, highWaterMark 选项指定了总共的字节数。对于工作在对象模式的流, highWaterMark 指定了对象的总数。

当可读流的实现调用stream.push(chunk)方法时,数据被放到缓冲器中。如果流的消费者没有调用stream.read()方法, 这些数据会始终存在于内部队列中,直到被消费。

当内部可读缓冲器的大小达到 highWaterMark 指定的阈值时,流会暂停从底层资源读取数据,直到当前 缓冲器的数据被消费 (也就是说, 流会在内部停止调用 readable._read() 来填充可读缓冲器)。

可写流通过反复调用 writable.write(chunk) 方法将数据放到缓冲器。 当内部可写缓冲器的总大小小于 highWaterMark 指定的阈值时, 调用 writable.write() 将返回true。 一旦内部缓冲器的大小达到或超过 highWaterMark ,调用 writable.write() 将返回 false 。

stream API 的关键目标, 尤其对于 stream.pipe() 方法, 就是限制缓冲器数据大小,以达到可接受的程度。这样,对于读写速度不匹配的源头和目标,就不会超出可用的内存大小。

Duplex 和 Transform 都是可读写的。 在内部,它们都维护了 两个 相互独立的缓冲器用于读和写。 在维持了合理高效的数据流的同时,也使得对于读和写可以独立进行而互不影响。

function computeNewHighWaterMark(n) {  n--;  n |= n >>> 1;  n |= n >>> 2;  n |= n >>> 4;  n |= n >>> 8;  n |= n >>> 16;  n++; return n;}read(n) { // 想取1个    if (n > this.length) {        // 更改缓存区大小  读取五个就找 2的几次放最近的        this.highWaterMark = computeNewHighWaterMark(n)        this.emittedReadable = true;        this._read();    }    // 如果n>0 去缓存区中取吧    let buffer = null;    let index = 0; // 维护buffer的索引的    let flag = true;    if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多        // 在缓存区中取 [[2,3],[4,5,6]]        buffer = Buffer.alloc(n); // 这是要返回的buffer        let buf;        while (flag && (buf = this.buffers.shift())) {            for (let i = 0; i < buf.length; i++) {                buffer[index++] = buf[i];                if (index === n) { // 拷贝够了 不需要拷贝了                    flag = false;                    this.length -= n;                    let bufferArr = buf.slice(i+1); // 取出留下的部分                    // 如果有剩下的内容 在放入到缓存中                    if (bufferArr.length > 0) {                        this.buffers.unshift(bufferArr);                    }                    break;                }            }        }    }    // 当前缓存区 小于highWaterMark时在去读取    if (this.length == 0) {        this.emittedReadable = true;    }    if (this.length < this.highWaterMark) {        if (!this.reading) {            this.reading = true;            this._read(); // 异步的        }    }    return buffer;}复制代码

完整的代码

let fs = require('fs');let EventEmitter = require('events');function computeNewHighWaterMark(n) {      n--;      n |= n >>> 1;      n |= n >>> 2;      n |= n >>> 4;      n |= n >>> 8;      n |= n >>> 16;      n++;     return n;}  class ReadStream extends EventEmitter {    constructor(path, options) {        super();        this.path = path;        this.highWaterMark = options.highWaterMark || 64 * 1024;        this.autoClose = options.autoClose || true;        this.start = 0;        this.end = options.end;        this.flags = options.flags || 'r';        this.buffers = []; // 缓存区         this.pos = this.start;        this.length = 0; // 缓存区大小        this.emittedReadable = false;        this.reading = false; // 不是正在读取的        this.open();        this.on('newListener', (eventName) => {            if (eventName === 'readable') {                this.read();            }        })    }        read(n) {         if (n > this.length){            // 更改缓存区大小  读取五个就找 2的几次放最近的            this.highWaterMark = computeNewHighWaterMark(n)            this.emittedReadable = true;            this._read();        }        // 如果n>0 去缓存区中取吧        let buffer = null;        let index = 0; // 维护buffer的索引的        let flag = true;        if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多            // 在缓存区中取 [[2,3],[4,5,6]]            buffer = Buffer.alloc(n); // 这是要返回的buffer            let buf;            while (flag && (buf = this.buffers.shift())) {                for (let i = 0; i < buf.length; i++) {                    buffer[index++] = buf[i];                    if(index === n){ // 拷贝够了 不需要拷贝了                        flag = false;                        this.length -= n;                        let bufferArr = buf.slice(i+1); // 取出留下的部分                        // 如果有剩下的内容 在放入到缓存中                        if(bufferArr.length > 0) {                            this.buffers.unshift(bufferArr);                        }                        break;                    }                }            }        }        // 当前缓存区 小于highWaterMark时在去读取        if (this.length == 0) {            this.emittedReadable = true;        }        if (this.length < this.highWaterMark) {            if(!this.reading){                this.reading = true;                this._read(); // 异步的            }        }        return buffer;    }        // 封装的读取的方法    _read() {        // 当文件打开后在去读取        if (typeof this.fd !== 'number') {            return this.once('open', () => this._read());        }        // 上来我要喝水 先倒三升水 []        let buffer = Buffer.alloc(this.highWaterMark);        fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {            if (bytesRead > 0) {                // 默认读取的内容放到缓存区中                this.buffers.push(buffer.slice(0, bytesRead));                this.pos += bytesRead; // 维护读取的索引                this.length += bytesRead;// 维护缓存区的大小                this.reading = false;                // 是否需要触发readable事件                if (this.emittedReadable) {                    this.emittedReadable = false; // 下次默认不触发                    this.emit('readable');                }            } else {                this.emit('end');                this.destroy();            }        })    }    destroy() {        if (typeof this.fd !== 'number') {            return this.emit('close')        }        fs.close(this.fd, () => {            this.emit('close')        })    }    open() {        fs.open(this.path, this.flags, (err, fd) => {            if (err) {                this.emit('error', err);                if (this.autoClose) {                    this.destroy();                }                return            }            this.fd = fd;            this.emit('open');        });    }}module.exports = ReadStream;复制代码

LineReader 最后,结合上面所说的暂停模式readable,我们来实现一个行读取器的例子,我们先定义好一个行读取器类和它的测试代码,它实现的功能就是我们通过创建一个LineReader对象并传入要读取的文件,然后监听line事件,在每次读取到一行数据时就会触发line的回调函数。

// LineReader 行读取器let fs = require('fs');let EventEmitter = require('events');let path = require('path');class LineReader extends EventEmitter {}let lineReader = new LineReader(path.join(__dirname, './2.txt'));lineReader.on('line', function (data) {    console.log(data); // abc , 123 , 456 ,678})复制代码

可写流

var stream = require('stream');var util = require('util');util.inherits(Writer, stream.Writable);let stock = [];function Writer(opt) {    stream.Writable.call(this, opt);}Writer.prototype._write = function(chunk, encoding, callback) {    setTimeout(()=>{        stock.push(chunk.toString('utf8'));        console.log("增加: " + chunk);        callback();    },500)};var w = new Writer();for (var i=1; i<=5; i++){    w.write("项目:" + i, 'utf8');}w.end("结束写入",function(){    console.log(stock);});管道流 管道提供了一个输出流到输入流的机制。通常我们用于从一个流中获取数据并将数据传递到另外一个流中。const stream = require('stream')var index = 0;const readable = stream.Readable({    highWaterMark: 2,    read: function () {        process.nextTick(() => {            console.log('push', ++index)            this.push(index+'');        })    }})const writable = stream.Writable({    highWaterMark: 2,    write: function (chunk, encoding, next) {        console.log('写入:', chunk.toString())    }})readable.pipe(writable);复制代码

双工流 可读可写

const {Duplex} = require('stream');const inoutStream = new Duplex({    write(chunk, encoding, callback) {        console.log(chunk.toString());        callback();    },    read(size) {        this.push((++this.index)+'');        if (this.index > 3) {            this.push(null);        }    }});inoutStream.index = 0;process.stdin.pipe(inoutStream).pipe(process.stdout);复制代码

转载于:https://juejin.im/post/5ac81f9a6fb9a028b617c271

你可能感兴趣的文章
jboss7 添加虚拟目录 上传文件路径
查看>>
CRT/LCD/VGA Information and Timing
查看>>
C# PPT 为形状设置三维效果
查看>>
Android DecorView浅析
查看>>
C 双向链表
查看>>
hdu 5452(树链刨分)
查看>>
LVM Linear vs Striped Logical Volumes
查看>>
Mysql主从备份和SQL语句的备份
查看>>
DEDECMS之三 首页、列表页怎么调用文章内容
查看>>
iOS开发多线程篇 09 —NSOperation简单介绍
查看>>
WINDOWS下调用GetTokenInformation的奇怪之处--两次调用
查看>>
HDU 5813 Elegant Construction 构造
查看>>
Tomcat就是个容器,一种软件
查看>>
php结合redis实现高并发下的抢购、秒杀功能
查看>>
统计服务连接状况
查看>>
Android事件总线(三)otto用法全解析
查看>>
js数组实现不重复插入数据
查看>>
[译]使用 Siesta 处理 Swift 网络请求
查看>>
Android 中的子线程解析
查看>>
aidl跨进程通讯
查看>>