Skip to content

Instantly share code, notes, and snippets.

@hackergrrl
Last active February 1, 2019 23:50
Show Gist options
  • Save hackergrrl/b8da1258c84b0c0988b9c0c5f68f67c6 to your computer and use it in GitHub Desktop.
Save hackergrrl/b8da1258c84b0c0988b9c0c5f68f67c6 to your computer and use it in GitHub Desktop.
var stream = require('stream')
module.exports = function (id) {
// opts.allowHalfDuplex is _very_ important! Otherwise ending the readable
// half of the duplex will leave the writable side open!
var counter = new stream.Duplex({allowHalfOpen:false})
counter.processed = []
var payloadsLeft = 3
counter._read = function () {
if (!payloadsLeft) return this.push(null)
var payload = JSON.stringify({ sender: id, left: payloadsLeft })
var prefix = Buffer.alloc(4)
prefix.writeUInt32LE(payload.length, 0)
this.push(prefix)
this.push(payload)
payloadsLeft--
}
var expected = 0
var accum = Buffer.alloc(0)
counter._write = function (chunk, enc, next) {
accum = Buffer.concat([accum, chunk])
tryParse()
next()
}
function tryParse () {
// haven't recv'd prefix len yet
if (!expected && accum.length < 4) return
// read prefix length
if (!expected) {
expected = accum.readUInt32LE(0)
accum = accum.slice(4)
}
// read next chunk
if (accum.length >= expected) {
var buf = accum.slice(0, expected)
var value = JSON.parse(buf.toString())
counter.processed.push(value)
accum = accum.slice(expected)
expected = 0
tryParse()
}
}
// exposes '.processed' so you can examine the payloads received
return counter
}
var counterDuplex = require('./counter')
var a = counterDuplex('a')
var b = counterDuplex('b')
a.pipe(b).pipe(a)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment