Skip to content

Instantly share code, notes, and snippets.

@constantology
Last active October 7, 2020 09:59
Show Gist options
  • Save constantology/0ec06de52ecd6e7d815dbce17d13a5a1 to your computer and use it in GitHub Desktop.
Save constantology/0ec06de52ecd6e7d815dbce17d13a5a1 to your computer and use it in GitHub Desktop.
Easy-peasy-lemon-squeezy iterating non-standard stream implementations...
// For non-standard streams — APIs that stream but the library exposes something else and not the underlying Node.js stream, e.g `xml-stream` and `xml-stream-saxjs`, — you can use the following to perform standard `for await of...` iteration over each item emitted in events.
const sleep = require('./sleep');
module.exports = async function* iterateNonStandardStream({
endEvent = 'end',
errorEvent = 'error',
itemEvent,
stream,
timeout = 900000
}) {
const queue = [];
let done = false;
let error;
const tid = setTimeout(() => {
error = new Error('TimeoutError: Iterating Non-Standard Stream.');
done = true;
}, timeout);
stream.on(itemEvent, (item) => queue.push(item));
stream.on(endEvent, () => {
done = true;
clearTimeout(tid);
});
stream.on(errorEvent, (e) => {
error = e;
done = true;
});
while (true) {
if (queue.length && !error) {
const item = queue.shift();
yield item;
}
else {
if (error) {
throw error;
}
if (done === true) {
break;
}
}
await sleep();
}
};
const SomeTypeOfNonStandardStream = require('./some-type-of-non-standard-stream');
const iterateNonStandardStream = require('./iterate-non-standard-stream');
(async () => {
const nonIterableStream = new SomeTypeOfNonStandardStream({...});
const stream = iterateNonStandardStream({
itemEvent: 'item',
stream: nonIterableStream
});
for await (const rawItem of stream) {
// Do something with each: `rawItem`
}
}());
module.exports = function sleep(ms = 0) {
return new Promise((res) => (ms > 0 ? setTimeout(res, ms) : setImmediate(res)));
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment