193 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			193 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| const { Duplex, pipeline } = require('streamx')
 | |
| 
 | |
| module.exports = class Composer extends Duplex {
 | |
|   constructor (opts) {
 | |
|     super(opts)
 | |
| 
 | |
|     this._writable = null
 | |
|     this._readable = null
 | |
|     this._isPipeline = false
 | |
|     this._pipelineMissing = 2
 | |
| 
 | |
|     this._writeCallback = null
 | |
|     this._finalCallback = null
 | |
| 
 | |
|     this._ondata = this._pushData.bind(this)
 | |
|     this._onend = this._pushEnd.bind(this, null)
 | |
|     this._ondrain = this._continueWrite.bind(this, null)
 | |
|     this._onfinish = this._maybeFinal.bind(this)
 | |
|     this._onerror = this.destroy.bind(this)
 | |
|     this._onclose = this.destroy.bind(this, null)
 | |
|   }
 | |
| 
 | |
|   static pipeline (...streams) {
 | |
|     const c = new Composer()
 | |
|     c.setPipeline(...streams)
 | |
|     return c
 | |
|   }
 | |
| 
 | |
|   static duplexer (ws = null, rs = null) {
 | |
|     const c = new Composer()
 | |
|     c.setWritable(ws)
 | |
|     c.setReadable(rs)
 | |
|     return c
 | |
|   }
 | |
| 
 | |
|   setPipeline (first, ...streams) {
 | |
|     const all = Array.isArray(first) ? first : [first, ...streams]
 | |
| 
 | |
|     this._isPipeline = true
 | |
|     this.setWritable(all[0])
 | |
|     this.setReadable(all[all.length - 1])
 | |
| 
 | |
|     pipeline(all, (err) => {
 | |
|       if (err) this.destroy(err)
 | |
|     })
 | |
| 
 | |
|     return this
 | |
|   }
 | |
| 
 | |
|   setReadable (rs) {
 | |
|     if (this._readable) {
 | |
|       this._readable.removeListener('data', this._ondata)
 | |
|       this._readable.removeListener('end', this._onend)
 | |
|       this._readable.removeListener('error', this._onerror)
 | |
|       this._readable.removeListener('close', this._onclose)
 | |
|     }
 | |
| 
 | |
|     if (rs === null) {
 | |
|       this._readable = null
 | |
|       this.push(null)
 | |
|       this.resume()
 | |
|       return
 | |
|     }
 | |
| 
 | |
|     this._readable = rs
 | |
|     this._readable.on('data', this._ondata)
 | |
|     this._readable.on('end', this._onend)
 | |
|     this._readable.on('error', this._onerror)
 | |
|     this._readable.on('close', this._onclose)
 | |
| 
 | |
|     if (this.destroying && this._readable.destroy) {
 | |
|       this._readable.destroy()
 | |
|     }
 | |
| 
 | |
|     return this
 | |
|   }
 | |
| 
 | |
|   setWritable (ws) {
 | |
|     if (this._writable) {
 | |
|       this._writable.removeListener('drain', this._ondrain)
 | |
|       this._writable.removeListener('finish', this._onfinish)
 | |
|       this._writable.removeListener('error', this._onerror)
 | |
|       this._writable.removeListener('close', this._onclose)
 | |
|     }
 | |
| 
 | |
|     if (ws === null) {
 | |
|       this._writable = null
 | |
|       this._continueWrite(null)
 | |
|       this.end()
 | |
|       return
 | |
|     }
 | |
| 
 | |
|     this._writable = ws
 | |
|     this._writable.on('drain', this._ondrain)
 | |
|     this._writable.on('finish', this._onfinish)
 | |
|     this._writable.on('error', this._onerror)
 | |
|     this._writable.on('close', this._onclose)
 | |
| 
 | |
|     if (this.destroying && this._writable.destroy) {
 | |
|       this._writable.destroy()
 | |
|     }
 | |
| 
 | |
|     return this
 | |
|   }
 | |
| 
 | |
|   _read (cb) {
 | |
|     if (this._readable !== null) {
 | |
|       this._readable.resume()
 | |
|     }
 | |
| 
 | |
|     cb(null)
 | |
|   }
 | |
| 
 | |
|   _pushData (data) {
 | |
|     if (this.push(data) === false && this._readable !== null) {
 | |
|       this._readable.pause()
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   _pushEnd () {
 | |
|     if (this._isPipeline) {
 | |
|       this.on('end', this._decrementPipeline.bind(this))
 | |
|     }
 | |
|     this.push(null)
 | |
|     if (this._readable !== null) {
 | |
|       this._readable.removeListener('close', this._onclose)
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   _decrementPipeline () {
 | |
|     if (--this._pipelineMissing === 0) this._continueFinal(null)
 | |
|   }
 | |
| 
 | |
|   _maybeFinal () {
 | |
|     if (this._writable !== null) {
 | |
|       this._writable.removeListener('close', this._onclose)
 | |
|     }
 | |
| 
 | |
|     if (this._isPipeline) this._decrementPipeline()
 | |
|     else this._continueFinal(null)
 | |
|   }
 | |
| 
 | |
|   _continueFinal (err) {
 | |
|     if (this._finalCallback === null) return
 | |
| 
 | |
|     const cb = this._finalCallback
 | |
|     this._finalCallback = null
 | |
|     cb(err)
 | |
|   }
 | |
| 
 | |
|   _continueWrite (err) {
 | |
|     if (this._writeCallback === null) return
 | |
|     const cb = this._writeCallback
 | |
|     this._writeCallback = null
 | |
|     cb(err)
 | |
|   }
 | |
| 
 | |
|   _predestroy () {
 | |
|     if (this._writable !== null && this._writable.destroy) this._writable.destroy()
 | |
|     if (this._readable !== null && this._readable.destroy) this._readable.destroy()
 | |
|     this._continueWrite(new Error('Stream destroyed'))
 | |
|     this._continueFinal(new Error('Stream destroyed'))
 | |
|   }
 | |
| 
 | |
|   _writev (datas, cb) {
 | |
|     if (this._writable === null) {
 | |
|       return cb(null)
 | |
|     }
 | |
| 
 | |
|     let flushed = true
 | |
| 
 | |
|     for (const data of datas) {
 | |
|       flushed = this._writable.write(data)
 | |
|     }
 | |
| 
 | |
|     if (!flushed) {
 | |
|       this._writeCallback = cb
 | |
|       return
 | |
|     }
 | |
| 
 | |
|     cb(null)
 | |
|   }
 | |
| 
 | |
|   _final (cb) {
 | |
|     if (this._writable === null) {
 | |
|       return cb(null)
 | |
|     }
 | |
| 
 | |
|     this._finalCallback = cb
 | |
|     this._writable.end()
 | |
|   }
 | |
| }
 |