# 九.Stream
前言
- 流是一组有序的,有起点和终点的字节数据传输手段
- 它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
- 流是一个抽象接口,被 Node 中的很多对象所实现。比如 HTTP 服务器 request 和 response 对象都是流
# 1.可读流
实现了stream.Readable
接口的对象,将对象数据读取为流数据,当监听 data 事件后,开始发射数据
fs.createReadStream = function(path, options) {
return new ReadStream(path, options)
}
utils.inherits(ReadStream, Readable)
2
3
4
创建可读流
var rs = fs.createReadStream(path, [options])
1- 1.path 读取文件的路径
- 2.options
- flags 打开文件要做的操作,默认为
r
- encoding 默认为 null
- start 开始读取的索引位置
- end 结束肚脐的索引位置
- highWaterMark 读取缓存区默认的大小 64kb
- flags 打开文件要做的操作,默认为
提示
如果指定 utf8 编码 highWaterMark 要大于 3 个字节
监听 data 事件
流切换到流动模式,数据会被尽可能快的读出
rs.on("data", function(data) { console.log(data) })
1
2
3监听 end 事件
该事件会在读完数据后被触发
rs.on("end", function() { console.log("读取完成") })
1
2
3监听 error 事件
rs.on("error", function(err) { console.log(err) })
1
2
3监听 open 事件
rs.on("open", function() { console.log("open") })
1
2
3监听 close 事件
rs.on("close", function() { console.log("close") })
1
2
3设置编码
与指定{encoding:'utf8'}效果相同,设置编码
rs.setEncoding("utf8")
1暂停和恢复触发 data
通过 pause()方法和 resume()方法
rs.on("data", function(data) { rs.pause() console.log(data) }) setTimeout(function() { rs.resume() }, 2000)
1
2
3
4
5
6
7
# 2.可写流
实现了 stream.Writable 接口的对象将流数据写入到对象中
fs.createWriteStream = function(path, options) {
return new WriteStream(path, options)
}
util.inherits(WriteStream, Writable)
2
3
4
创建可写流
var ws = fs.createWriteStream(path, [options])
1- 1.path 写入的文件路径
- 2.options
- flags 打开文件要做的操作,默认为
w
- encoding 默认为 utf8
- highWaterMark 写入缓存区的默认大小 16kb
- flags 打开文件要做的操作,默认为
write 方法
ws.write(chunk, [encoding], [callback])
1- 1.chunk 写入的数据 buffer/string
- 2.encoding 编码格式 chunk 为字符串时有用,可选
- 3.callback 写入成功后的回调
返回值为布尔值,系统缓存区满时为 false,未满时为 true
end 方法
ws.end(chunk, [encoding], [callback])
1表明接下来没有数据要被写入 writable 通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据,如果传入了可选的 callback 函数,它将作为
finish
事件的回调函数drain 方法
当一个流不处在 drain 的状态,对 write()的调用会缓存数据块,并且返回 false。一旦所有当前缓存的数据都排空了(被操作系统接受来进行输出),那么
drain
事件就会被触发建议,一旦 write() 返回 false,在
drain
事件触发前,不能写入任何数据块
let fs = require('fs')
let ws = fs.createWriteStream(./2.txt,{
flags:'w',
encoding:'utf8',
highWaterMark:3
})
let i = 10
function wrire(){
let flag = true
while(i && flag){
flag = ws.write('1')
i--
console.log(flag)
}
write()
ws.on('drain',()=>{
console.log('drain')
write()
})
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- finish 方法
在调用了 stream.end()方法,且缓冲区数据都已经传给底层系统之后,finish
事件将被触发。
var write = fs.createWriteStream("./2.txt")
for (let i = 0; i < 100; i++) {
writer.write(`hello,${i}!\n`)
}
writer.end("结束\n")
writer.on("finish", () => {
console.log("所有的写入已经完成!")
})
2
3
4
5
6
7
8
# 3.pipe 方法
- pipe 方法的原理
var fs = require("fs")
var ws = fs.createWriteStream("./2.txt")
var rs = fs.createReadStream("./1.txt")
rs.on("data", function(data) {
var flag = ws.write(data)
if (!flag) rs.pause()
})
ws.on("drain", function() {
rs.resume()
})
rs.on("end", function() {
ws.end()
})
2
3
4
5
6
7
8
9
10
11
12
13
- pipt 用法
// readStream.pipe(writeStream)
var from = fs.createReadStream("./1.txt")
var to = fs.createWriteStream("./2.txt")
from.pipe(to)
2
3
4
将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存
unpipe 用法
readable.unpipe()方法将之前通过 stream.pipe()方法绑定的流分离
如果 destination 没有传入,则所有绑定的流都会被分离
let fs = require("fs")
var from = fs.createReadStream("./1.txt")
var to = fs.createWriteStream("./2.txt")
from.pipe(to)
setTimeout(() => {
console.log("关闭向2.txt的写入")
from.unpipe(writable)
console.log("手工关闭文件流")
to.end()
}, 1000)
2
3
4
5
6
7
8
9
10
- cork
调用 writable.cork()方法将强制所有写入数据都存放到内存中的缓冲区里。直到调用stream.uncork()
或stream.end()
方法时,缓冲区里的数据才会被输出。
- uncork
writable.uncork()
将输出在stream.cork()
方法被调用之后缓冲在内存中的所有数据
stream.cork()
stream.write("1")
stream.write("2")
process.nextTick(() => stream.uncork())
2
3
4
# 4.简单实现
# 4.1 可读流的简单实现
let fs = require('fs')
let ReadStream = require('./ReadStream')
let rs = ReadStream('./1.txt',{
flags:'r',
encoding:'utf8'.
start:3,
end:7,
highWaterMark:3
})
rs.on('open',function(){
console.log('open')
})
rs.on('data',function(data){
console.log(data)
})
rs.on('end',function(){
console.log('end')
})
rs.on('close',function(){
console.log('close')
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let fs = require("fs")
let EventEmitter = require("events")
class WriteStream extends EventEmitter {
constructor(path, options) {
super(path, options)
this.path = path
this.fd = options.fd
this.flags = options.flags || "r"
this.encoding = options.encoding
this.start = options.start || 0
this.pos = this.start
this.end = options.end
this.flowing = false
this.autoClose = true
this.highWaterMark = options.highWaterMark || 64 * 1024
this.buffer = Buffer.alloc(this.highWaterMark)
this.length = 0
this.on("newListener", (type, listener) => {
if (type == "data") {
this.flowing = true
this.read()
}
})
this.on("end", () => {
if (this.autoClose) {
this.destroy()
}
})
this.open()
}
read() {
if (typeof this.fd != "number") {
return this.once("open", () => this.read())
}
let n = this.end
? Math.min(this.end - this.pos, this.highWaterMark)
: this.highWaterMark
fs.read(this.fd, this.buffer, 0, n, this.pos, (err, bytesRead) => {
if (err) {
return
}
if (bytesRead) {
let data = this.buffer.slice(0, bytesRead)
data = this.encoding ? data.toString(this.encoding) : data
this.emit("data", data)
this.pos += bytesRead
if (this.end && this.pos > this.end) {
return this.emit("end")
}
if (this.flowing) this.read()
} else {
this.emit("end")
}
})
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) return this.emit("error", err)
this.fd = fd
this.emit("open", fd)
})
}
end() {
if (this.autoClose) {
this.destroy()
}
}
destroy() {
fs.close(this.fd, () => {
this.emit("close")
})
}
}
module.exports = WriteStream
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# 4.2 可写流的简单实现
let fs = require("fs")
let FileWriteStream = require("./FileWriteStream")
let ws = FileWriteStream("./2.txt", {
flags: "w",
encoding: "utf8",
highWaterMark: 3,
})
let i = 10
function write() {
let flag = true
while (i && flag) {
flag = ws.write(
"1",
"utf8",
(function(i) {
return function() {
console.log(i)
}
})(i)
)
i--
console.log(flag)
}
}
write()
ws.on("drain", () => {
console.log("drain")
write()
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
let fs = require("fs")
let EventEmitter = require("events")
class WriteStream extends EventEmitter {
constructor(path, options) {
super(path, options)
this.path = path
this.fd = options.fd
this.flags = options.flags || "w"
this.mode = options.mode || 0o666
this.encoding = options.encoding
this.start = options.start || 0
this.pos = this.start
this.writing = false
this.autoClose = true
this.highWaterMark = options.highWaterMark || 16 * 1024
this.buffers = []
this.length = 0
this.open()
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) return this.emit("error", err)
this.fd = fd
this.emit("open", fd)
})
}
write(chunk, encoding, cb) {
if (typeof encoding == "function") {
cb = encoding
encoding = null
}
chunk = Buffer.isBuffer(chunk)
? chunk
: Buffer.from(chunk, this.encoding || "utf8")
let len = chunk.length
this.length += len
let ret = this.length < this.highWaterMark
if (this.writing) {
this.buffers.push({
chunk,
encoding,
cb,
})
} else {
this.writing = true
this._write(chunk, encoding, this.clearBuffer.bind(this))
}
return ret
}
_write(chunk, encoding, cb) {
if (typeof this.fd != "number") {
return this.once("open", () => this._write(chunk, encoding, cb))
}
fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, written) => {
if (err) {
if (this.autoClose) {
this.destroy()
}
return this.emit("error", err)
}
this.length -= written
this.pos += written
cb && cb()
})
}
clearBuffer() {
let data = this.buffers.shift()
if (data) {
this._write(data.chunk, data.encoding, this.clearBuffer.bind(this))
} else {
this.writing = false
this.emit("drain")
}
}
end() {
if (this.autoClose) {
this.emit("end")
this.destroy()
}
}
destroy() {
fs.close(this.fd, () => {
this.emit("close")
})
}
}
module.exports = WriteStream
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# 4.3 pipe
let fs = require("fs")
let ReadStream = require("./ReadStream")
let rs = ReadStream("./1.txt", {
flags: "r",
encoding: "utf8",
highWaterMark: 3,
})
let FileWriteStream = require("./WriteStream")
let ws = FileWriteStream("./2.txt", {
flags: "w",
encoding: "utf8",
highWaterMark: 3,
})
rs.pipe(ws)
2
3
4
5
6
7
8
9
10
11
12
13
14
ReadStream.prototype.pipe = function(dest) {
this.on("data", (data) => {
let flag = dest.write(data)
if (!flag) {
this.pause()
}
})
dest.on("drain", () => {
this.resume()
})
this.on("end", () => {
dest.end()
})
}
ReadStream.prototype.pause = function() {
this.flowing = false
}
ReadStream.prototype.resume = function() {
this.flowing = true
this.read()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 4.4 暂停模式
let fs = require("fs")
let ReadStream2 = require("./ReadStream2")
let rs = new ReadStream2("./1.txt", {
start: 3,
end: 8,
encoding: "utf8",
highWaterMark: 3,
})
rs.on("readable", function() {
console.log("readable")
console.log("rs.buffer.length", rs.length)
let d = rs.read(1)
console.log(d)
console.log("rs.buffer.length", rs.length)
setTimeout(() => {
console.log("rs.buffer.length", rs.length)
}, 500)
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18