Skip to content

Instantly share code, notes, and snippets.

@fmoraesmeli
Created October 31, 2018 16:04
Show Gist options
  • Save fmoraesmeli/9d39f9696b3920ac7c822499cef7cbd4 to your computer and use it in GitHub Desktop.
Save fmoraesmeli/9d39f9696b3920ac7c822499cef7cbd4 to your computer and use it in GitHub Desktop.
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@Slf4j
public final class Zipper {
private static final Integer DEFAULT_PACKET_MINIMUM_SIZE = 24 * 1024 * 8;
private final DefaultDataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
private final WrappedDataBufferOutputStream wrappedDataBufferOutputStream =
new WrappedDataBufferOutputStream(dataBufferFactory.allocateBuffer());
private final ZipOutputStream zos = new ZipOutputStream(wrappedDataBufferOutputStream);
private final Integer packetMinimumSize;
@Getter
private final Flux<DataBuffer> zipStream;
public Zipper(final Integer packetMinimumSize, final Publisher<Entry> fileStream) {
this.packetMinimumSize = packetMinimumSize;
this.zipStream = Flux.from(fileStream)
.switchIfEmpty(Flux.error(new EmptySourceException()))
.handle(this::processEntry)
.concatWith(lastOutputMono());
}
public Zipper(final Publisher<Entry> fileStream) {
this(DEFAULT_PACKET_MINIMUM_SIZE, fileStream);
}
private void processEntry(final Entry entry, final SynchronousSink<DataBuffer> sink) {
writeEntryBytes(entry);
if(wrappedDataBufferOutputStream.getCurrentDataBuffer().readableByteCount() >= packetMinimumSize) {
sink.next(getOutput());
}
}
private DataBuffer getOutput() {
final DataBuffer dataBuffer = wrappedDataBufferOutputStream.getCurrentDataBuffer();
wrappedDataBufferOutputStream.newDataBuffer(dataBufferFactory.allocateBuffer());
return dataBuffer;
}
private void writeEntryBytes(final Entry entry) {
final ZipEntry zipEntry = new ZipEntry(entry.name);
uncheckedRun(() -> {
zos.putNextEntry(zipEntry);
zos.write(entry.content);
zos.closeEntry();
});
log.debug("written entry: {}", entry.name);
}
private Mono<DataBuffer> lastOutputMono() {
return Mono.fromSupplier(() -> {
uncheckedRun(zos::close);
return wrappedDataBufferOutputStream.getCurrentDataBuffer();
});
}
private void uncheckedRun(final CheckedIORunnable fn) {
try {
fn.run();
} catch (IOException io) {
throw new ZipIOException(io);
}
}
@FunctionalInterface
private interface CheckedIORunnable {
void run() throws IOException;
}
@EqualsAndHashCode(callSuper = true)
private static final class WrappedDataBufferOutputStream extends OutputStream {
@Getter
private DataBuffer currentDataBuffer;
private OutputStream dataBufferOutputStream;
private WrappedDataBufferOutputStream(final DataBuffer dataBuffer) {
newDataBuffer(dataBuffer);
}
@Override
public void write(int b) throws IOException {
dataBufferOutputStream.write(b);
}
private void newDataBuffer(final DataBuffer dataBuffer) {
this.currentDataBuffer = dataBuffer;
this.dataBufferOutputStream = currentDataBuffer.asOutputStream();
}
}
@Value
public static final class Entry {
private final String name;
private final byte[] content;
}
@EqualsAndHashCode(callSuper = true)
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public static final class ZipIOException extends RuntimeException {
@Getter
private final IOException inner;
}
public static final class EmptySourceException extends RuntimeException {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment