Skip to content

Instantly share code, notes, and snippets.

@mxstbr
Forked from ForbesLindesay/iter.js
Last active March 13, 2018 09:14
Show Gist options
  • Save mxstbr/e490c116993d5c54b98e79093d2a05a6 to your computer and use it in GitHub Desktop.
Save mxstbr/e490c116993d5c54b98e79093d2a05a6 to your computer and use it in GitHub Desktop.
A reusable utility to turn a callback-based listener into an async iterable
// @flow
// Turn a callback-based listener into many async iterators without buffering
import { $$asyncIterator } from 'iterall';
type Listener = ((arg: any) => void) => Promise<any>;
const defaultOnError = (err: Error) => {
throw new Error(err);
};
type Options = {|
onError?: (err: Error) => void,
filter?: (arg: any) => boolean,
|};
type Watcher = {
filter?: (arg: any) => boolean,
callback?: ({ done: boolean, value: any }) => void,
};
const asyncify = (listener: Listener) => {
let watchers: Array<Watcher> = [];
listener(value => {
watchers.forEach(watcher => {
if (watcher.callback && (!watcher.filter || watcher.filter(value))) {
watcher.callback({ done: false, value });
}
});
});
return ({ filter, onError = defaultOnError }: Options = {}) => {
let watcher: Watcher = { filter };
let watching = true;
const cleanup = () => {
if (watching) {
watching = false;
watchers = watchers.filter(w => w !== watcher);
}
};
try {
return {
next: () =>
new Promise(resolve => {
watcher.callback = resolve;
watchers.push(watcher);
}),
return: () => {
cleanup();
return Promise.resolve({ done: true });
},
throw: err => {
cleanup();
onError(err);
return Promise.reject(err);
},
[$$asyncIterator]() {
return this;
},
};
} catch (err) {
onError(err);
}
};
};
export default asyncify;
import asyncify from './asyncify';
// Stardard callback-based listener
const listenToNewMessages = (cb) => {
return onNewMessageInDb(message => cb(message));
};
const getMessageListener = asyncify(listenToNewMessages);
const asyncIterator = getMessageListener({
filter: message => message.threadId === threadId,
onError: (err) => { /* handle errors */ },
});
await asyncIterator.next();
await asyncIterator.next();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment