53 lines
		
	
	
		
			1.1 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			53 lines
		
	
	
		
			1.1 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| const { Readable } = require('streamx')
 | |
| 
 | |
| module.exports = function (s, forks = 2) {
 | |
|   const streams = new Array(forks)
 | |
|   const status = new Array(forks).fill(true)
 | |
| 
 | |
|   let ended = false
 | |
| 
 | |
|   for (let i = 0; i < forks; i++) {
 | |
|     streams[i] = new Readable({
 | |
|       read (cb) {
 | |
|         const check = !status[i]
 | |
|         status[i] = true
 | |
|         if (check && allReadable()) s.resume()
 | |
|         cb(null)
 | |
|       }
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   s.on('end', function () {
 | |
|     ended = true
 | |
|     for (const stream of streams) stream.push(null)
 | |
|   })
 | |
| 
 | |
|   s.on('error', function (err) {
 | |
|     for (const stream of streams) stream.destroy(err)
 | |
|   })
 | |
| 
 | |
|   s.on('close', function () {
 | |
|     if (ended) return
 | |
|     for (const stream of streams) stream.destroy()
 | |
|   })
 | |
| 
 | |
|   s.on('data', function (data) {
 | |
|     let needsPause = false
 | |
|     for (let i = 0; i < streams.length; i++) {
 | |
|       if (!(status[i] = streams[i].push(data))) {
 | |
|         needsPause = true
 | |
|       }
 | |
|     }
 | |
|     if (needsPause) s.pause()
 | |
|   })
 | |
| 
 | |
|   return streams
 | |
| 
 | |
|   function allReadable () {
 | |
|     for (let j = 0; j < status.length; j++) {
 | |
|       if (!status[j]) return false
 | |
|     }
 | |
|     return true
 | |
|   }
 | |
| }
 |