# 九.Stream

前言

  • 流是一组有序的,有起点和终点的字节数据传输手段
  • 它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
  • 流是一个抽象接口,被 Node 中的很多对象所实现。比如 HTTP 服务器 request 和 response 对象都是流

# 1.可读流

实现了stream.Readable接口的对象,将对象数据读取为流数据,当监听 data 事件后,开始发射数据

fs.createReadStream = function(path, options) {
  return new ReadStream(path, options)
}
utils.inherits(ReadStream, Readable)
1
2
3
4
  • 创建可读流

    var rs = fs.createReadStream(path, [options])
    
    1
    • 1.path 读取文件的路径
    • 2.options
      • flags 打开文件要做的操作,默认为r
      • encoding 默认为 null
      • start 开始读取的索引位置
      • end 结束肚脐的索引位置
      • highWaterMark 读取缓存区默认的大小 64kb

    提示

    如果指定 utf8 编码 highWaterMark 要大于 3 个字节

  • 监听 data 事件

    流切换到流动模式,数据会被尽可能快的读出

    rs.on("data", function(data) {
      console.log(data)
    })
    
    1
    2
    3
  • 监听 end 事件

    该事件会在读完数据后被触发

    rs.on("end", function() {
      console.log("读取完成")
    })
    
    1
    2
    3
  • 监听 error 事件

    rs.on("error", function(err) {
      console.log(err)
    })
    
    1
    2
    3
  • 监听 open 事件

    rs.on("open", function() {
      console.log("open")
    })
    
    1
    2
    3
  • 监听 close 事件

    rs.on("close", function() {
      console.log("close")
    })
    
    1
    2
    3
  • 设置编码

    与指定{encoding:'utf8'}效果相同,设置编码

    rs.setEncoding("utf8")
    
    1
  • 暂停和恢复触发 data

    通过 pause()方法和 resume()方法

    rs.on("data", function(data) {
      rs.pause()
      console.log(data)
    })
    setTimeout(function() {
      rs.resume()
    }, 2000)
    
    1
    2
    3
    4
    5
    6
    7

# 2.可写流

实现了 stream.Writable 接口的对象将流数据写入到对象中

fs.createWriteStream = function(path, options) {
  return new WriteStream(path, options)
}
util.inherits(WriteStream, Writable)
1
2
3
4
  • 创建可写流

    var ws = fs.createWriteStream(path, [options])
    
    1
    • 1.path 写入的文件路径
    • 2.options
      • flags 打开文件要做的操作,默认为w
      • encoding 默认为 utf8
      • highWaterMark 写入缓存区的默认大小 16kb
  • write 方法

    ws.write(chunk, [encoding], [callback])
    
    1
    • 1.chunk 写入的数据 buffer/string
    • 2.encoding 编码格式 chunk 为字符串时有用,可选
    • 3.callback 写入成功后的回调

      返回值为布尔值,系统缓存区满时为 false,未满时为 true

  • end 方法

    ws.end(chunk, [encoding], [callback])
    
    1

    表明接下来没有数据要被写入 writable 通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据,如果传入了可选的 callback 函数,它将作为finish事件的回调函数

  • drain 方法

  • 当一个流不处在 drain 的状态,对 write()的调用会缓存数据块,并且返回 false。一旦所有当前缓存的数据都排空了(被操作系统接受来进行输出),那么drain事件就会被触发

  • 建议,一旦 write() 返回 false,在drain事件触发前,不能写入任何数据块

let fs = require('fs')
let ws = fs.createWriteStream(./2.txt,{
  flags:'w',
  encoding:'utf8',
  highWaterMark:3
})
let i = 10
function wrire(){
  let flag = true
  while(i && flag){
    flag = ws.write('1')
    i--
    console.log(flag)
  }
  write()
  ws.on('drain',()=>{
    console.log('drain')
    write()
  })
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  • finish 方法

在调用了 stream.end()方法,且缓冲区数据都已经传给底层系统之后,finish事件将被触发。

var write = fs.createWriteStream("./2.txt")
for (let i = 0; i < 100; i++) {
  writer.write(`hello,${i}!\n`)
}
writer.end("结束\n")
writer.on("finish", () => {
  console.log("所有的写入已经完成!")
})
1
2
3
4
5
6
7
8

# 3.pipe 方法

  • pipe 方法的原理
var fs = require("fs")
var ws = fs.createWriteStream("./2.txt")
var rs = fs.createReadStream("./1.txt")
rs.on("data", function(data) {
  var flag = ws.write(data)
  if (!flag) rs.pause()
})
ws.on("drain", function() {
  rs.resume()
})
rs.on("end", function() {
  ws.end()
})
1
2
3
4
5
6
7
8
9
10
11
12
13
  • pipt 用法
// readStream.pipe(writeStream)
var from = fs.createReadStream("./1.txt")
var to = fs.createWriteStream("./2.txt")
from.pipe(to)
1
2
3
4

将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存

  • unpipe 用法

  • readable.unpipe()方法将之前通过 stream.pipe()方法绑定的流分离

  • 如果 destination 没有传入,则所有绑定的流都会被分离

let fs = require("fs")
var from = fs.createReadStream("./1.txt")
var to = fs.createWriteStream("./2.txt")
from.pipe(to)
setTimeout(() => {
  console.log("关闭向2.txt的写入")
  from.unpipe(writable)
  console.log("手工关闭文件流")
  to.end()
}, 1000)
1
2
3
4
5
6
7
8
9
10
  • cork

调用 writable.cork()方法将强制所有写入数据都存放到内存中的缓冲区里。直到调用stream.uncork()stream.end()方法时,缓冲区里的数据才会被输出。

  • uncork

writable.uncork()将输出在stream.cork()方法被调用之后缓冲在内存中的所有数据

stream.cork()
stream.write("1")
stream.write("2")
process.nextTick(() => stream.uncork())
1
2
3
4

# 4.简单实现

# 4.1 可读流的简单实现

let fs = require('fs')
let ReadStream = require('./ReadStream')
let rs = ReadStream('./1.txt',{
  flags:'r',
  encoding:'utf8'.
  start:3,
  end:7,
  highWaterMark:3
})
rs.on('open',function(){
  console.log('open')
})
rs.on('data',function(data){
  console.log(data)
})
rs.on('end',function(){
  console.log('end')
})
rs.on('close',function(){
  console.log('close')
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let fs = require("fs")
let EventEmitter = require("events")

class WriteStream extends EventEmitter {
  constructor(path, options) {
    super(path, options)
    this.path = path
    this.fd = options.fd
    this.flags = options.flags || "r"
    this.encoding = options.encoding
    this.start = options.start || 0
    this.pos = this.start
    this.end = options.end
    this.flowing = false
    this.autoClose = true
    this.highWaterMark = options.highWaterMark || 64 * 1024
    this.buffer = Buffer.alloc(this.highWaterMark)
    this.length = 0
    this.on("newListener", (type, listener) => {
      if (type == "data") {
        this.flowing = true
        this.read()
      }
    })
    this.on("end", () => {
      if (this.autoClose) {
        this.destroy()
      }
    })
    this.open()
  }

  read() {
    if (typeof this.fd != "number") {
      return this.once("open", () => this.read())
    }
    let n = this.end
      ? Math.min(this.end - this.pos, this.highWaterMark)
      : this.highWaterMark
    fs.read(this.fd, this.buffer, 0, n, this.pos, (err, bytesRead) => {
      if (err) {
        return
      }
      if (bytesRead) {
        let data = this.buffer.slice(0, bytesRead)
        data = this.encoding ? data.toString(this.encoding) : data
        this.emit("data", data)
        this.pos += bytesRead
        if (this.end && this.pos > this.end) {
          return this.emit("end")
        }
        if (this.flowing) this.read()
      } else {
        this.emit("end")
      }
    })
  }

  open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) return this.emit("error", err)
      this.fd = fd
      this.emit("open", fd)
    })
  }

  end() {
    if (this.autoClose) {
      this.destroy()
    }
  }

  destroy() {
    fs.close(this.fd, () => {
      this.emit("close")
    })
  }
}

module.exports = WriteStream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

# 4.2 可写流的简单实现

let fs = require("fs")
let FileWriteStream = require("./FileWriteStream")
let ws = FileWriteStream("./2.txt", {
  flags: "w",
  encoding: "utf8",
  highWaterMark: 3,
})
let i = 10
function write() {
  let flag = true
  while (i && flag) {
    flag = ws.write(
      "1",
      "utf8",
      (function(i) {
        return function() {
          console.log(i)
        }
      })(i)
    )
    i--
    console.log(flag)
  }
}
write()
ws.on("drain", () => {
  console.log("drain")
  write()
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
let fs = require("fs")
let EventEmitter = require("events")
class WriteStream extends EventEmitter {
  constructor(path, options) {
    super(path, options)
    this.path = path
    this.fd = options.fd
    this.flags = options.flags || "w"
    this.mode = options.mode || 0o666
    this.encoding = options.encoding
    this.start = options.start || 0
    this.pos = this.start
    this.writing = false
    this.autoClose = true
    this.highWaterMark = options.highWaterMark || 16 * 1024
    this.buffers = []
    this.length = 0
    this.open()
  }

  open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) return this.emit("error", err)
      this.fd = fd
      this.emit("open", fd)
    })
  }

  write(chunk, encoding, cb) {
    if (typeof encoding == "function") {
      cb = encoding
      encoding = null
    }

    chunk = Buffer.isBuffer(chunk)
      ? chunk
      : Buffer.from(chunk, this.encoding || "utf8")
    let len = chunk.length
    this.length += len
    let ret = this.length < this.highWaterMark
    if (this.writing) {
      this.buffers.push({
        chunk,
        encoding,
        cb,
      })
    } else {
      this.writing = true
      this._write(chunk, encoding, this.clearBuffer.bind(this))
    }
    return ret
  }

  _write(chunk, encoding, cb) {
    if (typeof this.fd != "number") {
      return this.once("open", () => this._write(chunk, encoding, cb))
    }
    fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, written) => {
      if (err) {
        if (this.autoClose) {
          this.destroy()
        }
        return this.emit("error", err)
      }
      this.length -= written
      this.pos += written
      cb && cb()
    })
  }

  clearBuffer() {
    let data = this.buffers.shift()
    if (data) {
      this._write(data.chunk, data.encoding, this.clearBuffer.bind(this))
    } else {
      this.writing = false
      this.emit("drain")
    }
  }

  end() {
    if (this.autoClose) {
      this.emit("end")
      this.destroy()
    }
  }

  destroy() {
    fs.close(this.fd, () => {
      this.emit("close")
    })
  }
}

module.exports = WriteStream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

# 4.3 pipe

let fs = require("fs")
let ReadStream = require("./ReadStream")
let rs = ReadStream("./1.txt", {
  flags: "r",
  encoding: "utf8",
  highWaterMark: 3,
})
let FileWriteStream = require("./WriteStream")
let ws = FileWriteStream("./2.txt", {
  flags: "w",
  encoding: "utf8",
  highWaterMark: 3,
})
rs.pipe(ws)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
ReadStream.prototype.pipe = function(dest) {
  this.on("data", (data) => {
    let flag = dest.write(data)
    if (!flag) {
      this.pause()
    }
  })
  dest.on("drain", () => {
    this.resume()
  })
  this.on("end", () => {
    dest.end()
  })
}
ReadStream.prototype.pause = function() {
  this.flowing = false
}
ReadStream.prototype.resume = function() {
  this.flowing = true
  this.read()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 4.4 暂停模式

let fs = require("fs")
let ReadStream2 = require("./ReadStream2")
let rs = new ReadStream2("./1.txt", {
  start: 3,
  end: 8,
  encoding: "utf8",
  highWaterMark: 3,
})
rs.on("readable", function() {
  console.log("readable")
  console.log("rs.buffer.length", rs.length)
  let d = rs.read(1)
  console.log(d)
  console.log("rs.buffer.length", rs.length)
  setTimeout(() => {
    console.log("rs.buffer.length", rs.length)
  }, 500)
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18