Skip to content

Instantly share code, notes, and snippets.

@jkschneider
Last active June 4, 2022 22:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jkschneider/f2f3c1cc7aa6b69c2162b9fbcbcab286 to your computer and use it in GitHub Desktop.
Save jkschneider/f2f3c1cc7aa6b69c2162b9fbcbcab286 to your computer and use it in GitHub Desktop.
Turns a `Flux<DataBuffer>` into an `InputStream`
class DataBuffersInputStream extends InputStream {
volatile DataBuffer currentBuf;
volatile boolean complete;
BaseSubscriber<DataBuffer> subscriber = new BaseSubscriber<>() {
@Override
protected void hookOnNext(DataBuffer buf) {
releaseCurrentBuf();
currentBuf = buf;
((NettyDataBuffer) buf).retain();
synchronized (DataBuffersInputStream.this) {
DataBuffersInputStream.this.notify();
}
}
@Override
protected void hookFinally(SignalType type) {
releaseCurrentBuf();
complete = true;
synchronized (DataBuffersInputStream.this) {
DataBuffersInputStream.this.notify();
}
}
};
DataBuffersInputStream(Flux<DataBuffer> dataBuffers) {
dataBuffers.subscribe(subscriber);
}
@Override
public int read() throws IOException {
while (!complete) {
if (currentBuf == null || currentBuf.readableByteCount() == 0) {
try {
synchronized (this) {
subscriber.request(1);
wait();
}
} catch (InterruptedException e) {
throw new IOException(e);
}
} else {
return currentBuf.read();
}
}
return -1;
}
@Override
public void close() {
releaseCurrentBuf();
subscriber.cancel();
}
private void releaseCurrentBuf() {
if (currentBuf != null) {
DataBufferUtils.release(currentBuf);
currentBuf = null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment