First adaptation pass
This commit is contained in:
+20
-6
@@ -1,11 +1,13 @@
|
||||
const { EventEmitter } = require('events')
|
||||
const { EventEmitter } = require('events-universal')
|
||||
const STREAM_DESTROYED = new Error('Stream was destroyed')
|
||||
const PREMATURE_CLOSE = new Error('Premature close')
|
||||
|
||||
const queueTick = require('queue-tick')
|
||||
const FIFO = require('fast-fifo')
|
||||
const TextDecoder = require('text-decoder')
|
||||
|
||||
// if we do a future major, expect queue microtask to be there always, for now a bit defensive
|
||||
const qmt = typeof queueMicrotask === 'undefined' ? fn => global.process.nextTick(fn) : queueMicrotask
|
||||
|
||||
/* eslint-disable no-multi-spaces */
|
||||
|
||||
// 29 bits used total (4 from shared, 14 from read, and 11 from write)
|
||||
@@ -98,6 +100,7 @@ const READ_READABLE_STATUS = OPEN_STATUS | READ_EMIT_READABLE | READ_QUEUED | RE
|
||||
const SHOULD_NOT_READ = OPEN_STATUS | READ_ACTIVE | READ_ENDING | READ_DONE | READ_NEEDS_PUSH | READ_READ_AHEAD
|
||||
const READ_BACKPRESSURE_STATUS = DESTROY_STATUS | READ_ENDING | READ_DONE
|
||||
const READ_UPDATE_SYNC_STATUS = READ_UPDATING | OPEN_STATUS | READ_NEXT_TICK | READ_PRIMARY
|
||||
const READ_NEXT_TICK_OR_OPENING = READ_NEXT_TICK | OPENING
|
||||
|
||||
// Combined write state
|
||||
const WRITE_PRIMARY_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_DONE
|
||||
@@ -232,7 +235,7 @@ class WritableState {
|
||||
updateNextTick () {
|
||||
if ((this.stream._duplexState & WRITE_NEXT_TICK) !== 0) return
|
||||
this.stream._duplexState |= WRITE_NEXT_TICK
|
||||
if ((this.stream._duplexState & WRITE_UPDATING) === 0) queueTick(this.afterUpdateNextTick)
|
||||
if ((this.stream._duplexState & WRITE_UPDATING) === 0) qmt(this.afterUpdateNextTick)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -417,10 +420,16 @@ class ReadableState {
|
||||
else this.updateNextTick()
|
||||
}
|
||||
|
||||
updateNextTickIfOpen () {
|
||||
if ((this.stream._duplexState & READ_NEXT_TICK_OR_OPENING) !== 0) return
|
||||
this.stream._duplexState |= READ_NEXT_TICK
|
||||
if ((this.stream._duplexState & READ_UPDATING) === 0) qmt(this.afterUpdateNextTick)
|
||||
}
|
||||
|
||||
updateNextTick () {
|
||||
if ((this.stream._duplexState & READ_NEXT_TICK) !== 0) return
|
||||
this.stream._duplexState |= READ_NEXT_TICK
|
||||
if ((this.stream._duplexState & READ_UPDATING) === 0) queueTick(this.afterUpdateNextTick)
|
||||
if ((this.stream._duplexState & READ_UPDATING) === 0) qmt(this.afterUpdateNextTick)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -728,12 +737,12 @@ class Readable extends Stream {
|
||||
}
|
||||
|
||||
push (data) {
|
||||
this._readableState.updateNextTick()
|
||||
this._readableState.updateNextTickIfOpen()
|
||||
return this._readableState.push(data)
|
||||
}
|
||||
|
||||
unshift (data) {
|
||||
this._readableState.updateNextTick()
|
||||
this._readableState.updateNextTickIfOpen()
|
||||
return this._readableState.unshift(data)
|
||||
}
|
||||
|
||||
@@ -1134,6 +1143,10 @@ function isReadStreamx (stream) {
|
||||
return isStreamx(stream) && stream.readable
|
||||
}
|
||||
|
||||
function isDisturbed (stream) {
|
||||
return (stream._duplexState & OPENING) !== OPENING || (stream._duplexState & ACTIVE_OR_TICKING) !== 0
|
||||
}
|
||||
|
||||
function isTypedArray (data) {
|
||||
return typeof data === 'object' && data !== null && typeof data.byteLength === 'number'
|
||||
}
|
||||
@@ -1159,6 +1172,7 @@ module.exports = {
|
||||
isStreamx,
|
||||
isEnded,
|
||||
isFinished,
|
||||
isDisturbed,
|
||||
getStreamError,
|
||||
Stream,
|
||||
Writable,
|
||||
|
||||
Reference in New Issue
Block a user