Skip to content

Instantly share code, notes, and snippets.

@johnsonjo4531
Last active June 2, 2022 20:04
Show Gist options
  • Save johnsonjo4531/932f50dde3ec1d719cb89dd6579ef84b to your computer and use it in GitHub Desktop.
Save johnsonjo4531/932f50dde3ec1d719cb89dd6579ef84b to your computer and use it in GitHub Desktop.
Pingpong coroutine like library
import { pingpong } from "./pingpong";
const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms));
(async () => {
const controller = pingpong<number, { done: true } | number>(async function ({
receive,
send
}) {
let num = 0;
(async () => {
while (true) {
await sleep(1000);
send(++num);
}
})();
(async () => {
while (true) {
const value = await receive();
if (!!value && typeof value !== "number" && "done" in value) return;
num = value ?? 0;
send(num);
}
})();
});
console.clear();
for await (const item of controller) {
if (!item) return;
if (item === 10) {
controller.send(100);
} else if (item > 110) {
return;
}
console.log(item);
}
console.log("done");
console.log(await controller.result);
})();
type PingPongGenType<InternalSend, InternalReceive> = (controllers: {
receive: () => Promise<InternalReceive | undefined>;
receiveAll: () => AsyncGenerator<
Awaited<InternalReceive> | undefined,
void,
any
>;
return: () => Promise<void>;
throw: (err: any) => Promise<never>;
send: (output: InternalSend) => Promise<void>;
}) => Promise<void>;
async function* consumeStream<T>(
stream: ReadableStreamDefaultReader<T>,
onExit: {
throw: (err?: any) => Promise<never> | never;
return: () => Promise<void> | void;
}
) {
try {
let done, value;
do {
({ done, value } = await stream.read());
if (done) continue;
yield value;
} while (!done);
} catch (err) {
return onExit.throw(err);
} finally {
return onExit.return();
}
}
export function pingpong<InternalSend, InternalReceive>(
generator: PingPongGenType<InternalSend, InternalReceive>
) {
type ExternalSend = InternalReceive;
type ExternalReceive = InternalSend;
// type InternalReceive = ExternalSend;
const internalStream = new TransformStream<InternalSend, InternalSend>();
const externalStream = new TransformStream<ExternalSend, ExternalSend>();
const [externalStreamReader, externalStreamWriter] = [
externalStream.readable.getReader(),
externalStream.writable.getWriter()
];
const [internalStreamReader, internalStreamWriter] = [
internalStream.readable.getReader(),
internalStream.writable.getWriter()
];
async function cleanup() {
for await (const _ of consumeStream(internalStreamReader, {
async throw(err) {
throw err;
},
async return() {}
})) {
}
for await (const _ of consumeStream(externalStreamReader, {
async throw(err) {
throw err;
},
async return() {}
})) {
}
await Promise.allSettled([
internalStreamWriter.releaseLock(),
externalStreamWriter.releaseLock()
]);
return Promise.allSettled([
internalStreamReader.releaseLock(),
externalStreamReader.releaseLock()
]);
}
const result = generator({
async receive() {
return (await externalStreamReader.read()).value;
},
receiveAll: async function* receiveAll() {},
async return() {
await cleanup();
},
async throw(err: any) {
await cleanup();
throw err;
},
async send(output) {
return internalStreamWriter.write(output);
}
});
return {
async receive(): Promise<ExternalReceive> {
return (await internalStreamReader.read()).value;
},
send(output: ExternalSend) {
return externalStreamWriter.write(output);
},
receiveAll: async function* receiveAll() {
try {
let done, value;
do {
({ done, value } = await internalStreamReader.read());
if (done) continue;
yield value;
} while (!done);
} catch (err) {
return this.throw(err);
} finally {
return this.return();
}
},
async return() {
await cleanup();
},
async throw(err: any) {
await cleanup();
throw err;
},
[Symbol.asyncIterator]() {
return this.receiveAll();
},
result
} as const;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment