Skip to content

Instantly share code, notes, and snippets.

@mitch-seymour
Created December 23, 2020 22:57
Show Gist options
  • Save mitch-seymour/4faae20f3a2a7f48b5226fa80f1008d3 to your computer and use it in GitHub Desktop.
Save mitch-seymour/4faae20f3a2a7f48b5226fa80f1008d3 to your computer and use it in GitHub Desktop.
Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put("segment.bytes", "536870912");
topicConfigs.put("min.cleanable.dirty.ratio", "0.3");

StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], String> stream = builder.stream("patient-events");

KTable<byte[], Long> counts =
    stream
        .groupByKey()
        .count(
            Materialized.<byte[], Long, KeyValueStore<Bytes, byte[]>>as("counts")
                .withKeySerde(Serdes.ByteArray())
                .withValueSerde(Serdes.Long())
                .withLoggingEnabled(topicConfigs));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment