Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put("segment.bytes", "536870912");
topicConfigs.put("min.cleanable.dirty.ratio", "0.3");
StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], String> stream = builder.stream("patient-events");
KTable<byte[], Long> counts =
stream
.groupByKey()
.count(
Materialized.<byte[], Long, KeyValueStore<Bytes, byte[]>>as("counts")
.withKeySerde(Serdes.ByteArray())
.withValueSerde(Serdes.Long())
.withLoggingEnabled(topicConfigs));
Created
December 23, 2020 22:57
-
-
Save mitch-seymour/4faae20f3a2a7f48b5226fa80f1008d3 to your computer and use it in GitHub Desktop.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment