Как передать в топик Kafka финальный groupBy()

Рейтинг: 1Ответов: 0Опубликовано: 10.03.2023

Прошу Вас помочь разобраться с KafkaStreams.
Есть код, который делает обычный groupBy():

public static void createGroupByStream(final StreamsBuilder builder) {   
    KStream<String, Person> stream = builder.stream("kafka-stream-input", Consumed.with(Serdes.String(), CustomSerdes.PersonSerde()));  
      stream  
            .groupByKey()  
            .reduce((item, second) -> {  
                item.setCount(item.getCount() + second.getCount());  
                return item;  
            } )  
            .toStream()  
            .to("kafka-stream-output", Produced.with(Serdes.String(), CustomSerdes.PersonSerde()));  
}

Класс Person:

public class Person extends GroupingEntity {  
    private String name;  
    private int count;
    }

Класс CustomSerdes:

public final class CustomSerdes {  
 
    public static Serde<Person> PersonSerde() {  
        JsonSerializer<Person> serializer = new JsonSerializer<>();  
        JsonDeserializer<Person> deserializer = new JsonDeserializer<>(Person.class);  
        return Serdes.serdeFrom(serializer, deserializer);  
    }

На данный момент данные обновляются в потоке, т.е. в топике "kafka-stream-output" хранится вся история обновления поля count для каждого Person.
Мне нужно, чтобы в выходном топике (в примере - это "kafka-stream-output"), информация появилась только тогда, когда все сообщения из топика "kafka-stream-input" были обработаны. Т.е. был только финальный groupBy(). Можете подсказать, как это сделать?

Ответы

Ответов пока нет.