Skip to content

Instantly share code, notes, and snippets.

@nybblr
Last active July 13, 2022 03:40
Show Gist options
  • Save nybblr/3af62797052c42f7090b4f8614b5e157 to your computer and use it in GitHub Desktop.
Save nybblr/3af62797052c42f7090b4f8614b5e157 to your computer and use it in GitHub Desktop.
3 examples of using Async Generators and Async Iteration in JavaScript!
// Create a Promise that resolves after ms time
var timer = function(ms) {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
};
// Repeatedly generate a number starting
// from 0 after a random amount of time
var source = async function*() {
var i = 0;
while (true) {
await timer(Math.random() * 1000);
yield i++;
}
};
// Return a new async iterator that applies a
// transform to the values from another async generator
var map = async function*(stream, transform) {
for await (let n of stream) {
yield transform(n);
}
};
// Tie everything together
var run = async function() {
var stream = source();
// Square values generated by source() as they arrive
stream = map(stream, n => n * n);
for await (let n of stream) {
console.log(n);
}
};
run();
// => 0
// => 1
// => 4
// => 9
// ...
// Generate a Promise that listens only once for an event
var oncePromise = (emitter, event) => {
return new Promise(resolve => {
var handler = (...args) => {
emitter.removeEventListener(event, handler);
resolve(...args);
};
emitter.addEventListener(event, handler);
});
};
// Add an async iterator to all WebSockets
WebSocket.prototype[Symbol.asyncIterator] = async function*() {
while(this.readyState !== 3) {
yield (await oncePromise(this, 'message')).data;
}
};
// Tie everything together
var run = async () => {
var ws = new WebSocket('ws://localhost:3000/');
for await (let message of ws) {
console.log(message);
}
};
run();
// => "hello"
// => "sandwich"
// => "otters"
// ...
// Tie everything together
var run = async () => {
var i = 0;
var clicks = streamify('click', document.querySelector('body'));
clicks = filter(clicks, e => e.target.matches('a'));
clicks = distinct(clicks, e => e.target);
clicks = map(clicks, e => [i++, e]);
clicks = throttle(clicks, 500);
subscribe(clicks, ([ id, click ]) => {
console.log(id);
console.log(click);
click.preventDefault();
});
};
// Turn any event emitter into a stream
var streamify = async function*(event, element) {
while (true) {
yield await oncePromise(element, event);
}
};
// Generate a Promise that listens only once for an event
var oncePromise = (emitter, event) => {
return new Promise(resolve => {
var handler = (...args) => {
emitter.removeEventListener(event, handler);
resolve(...args);
};
emitter.addEventListener(event, handler);
});
};
// Only pass along events that meet a condition
var filter = async function*(stream, test) {
for await (var event of stream) {
if (test(event)) {
yield event;
}
}
};
// Transform every event of the stream
var map = async function*(stream, transform) {
for await (var event of stream) {
yield transform(event);
}
};
// Only pass along event if some time has passed since the last one
var throttle = async function*(stream, delay) {
var lastTime;
var thisTime;
for await (var event of stream) {
thisTime = (new Date()).getTime();
if (!lastTime || thisTime - lastTime > delay) {
lastTime = thisTime;
yield event;
}
}
};
var identity = e => e;
// Only pass along events that differ from the last one
var distinct = async function*(stream, extract = identity) {
var lastVal;
var thisVal;
for await (var event of stream) {
thisVal = extract(event);
if (thisVal !== lastVal) {
lastVal = thisVal;
yield event;
}
}
};
// Invoke a callback every time an event arrives
var subscribe = async (stream, callback) => {
for await (var event of stream) {
callback(event);
}
};
run();
@tracker1
Copy link

tracker1 commented Apr 2, 2017

Okay... pretty sure this debounce should work as expected... will debounce initial event, and the last event after interval ms pass... I find that for events such as resize, scroll and drag/zoom that debounce like below is the most appropriate filter.

https://gist.github.com/tracker1/fb103312c276a585bdfa4565427692cb

const destreamify = async (stream, callback) => {
  for await (let event of stream) {
    callback(event);
  }
};

const debounce = function*(stream, interval) {
  let first;     // is this first event?  will pass
  let lastEvent; // the last event raised
  let deferred;  // deferred promise instance
  let resolve;   // resolve method for deferred promise

  // reset internal state - create new deferred/resolve
  const reset = (isFirst) => {
    first = isFirst;
    lastEvent = undefined;
    deferred = new Promise(r => resolve = r);
  };

  // handle event resolution
  const passEvent = () => {
    // if no event to pass
    if (lastEvent === undefined) {
      first = true; // reset first state
      return;
    }

    const event = lastEvent; // handle to event to pass
    const res = resolve; // handle to resolve for current deferred
    reset(false); // reset and create next deferred
    setTimeout(passEvent, interval); // debounce timer
    res(event); // resolve current deferred
  };

  reset(true); // set initial state & deferred
  destreamify(stream, (event) => {
    lastEvent = event; // reference event
    if (first) passEvent(); // if first run, pass it through
  });

  // yield deferred results
  while (true) {
    yield deferred;
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment