Достать сообщение из kafka по timestamp

Рейтинг: -2Ответов: 1Опубликовано: 17.08.2023

Пытаюсь вычитать сообщение по timestamp. На методе .seek вылетает исключение "No current assignment for partition...". consumer подписывается на топик методом .subscribe() в consumerFactory. long offset находит хорошо. В чем может быть проблема?

KafkaConsumer<String, String> consumer = consumerFactory.getConsumer(topicName);
Map<TopicPartition, Long> timestamps = new HashMap<>();
TopicPartition topicPartition = new TopicPartition(topicName, 0);
timestamps.put(new TopicPartition(topicName, 0), targetTimestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
//offsets такой {chat_68fa03b2-39cb-4fce-ad27-4e1a9276b453-0=(timestamp=1692192922209, leaderEpoch=0, offset=128)}
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
if (offsetAndTimestamp != null) {
    long offset = offsetAndTimestamp.offset();
    consumer.seek(topicPartition, offset); //тут java.lang.IllegalStateException: No current assignment for partition chat_68fa03b2-39cb-4fce-ad27-4e1a9276b453-0
} else {

}

getConsumer выглядит так:

    public KafkaConsumer getConsumer(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaBrokerUrl);
        props.put("group.id", topicName);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topicName));
        return consumer;
    }

Ответы

▲ 1

Ваш код может даже будет работать случайно, если subscribe выделит Вашему консюмеру партишн 0. Просто Вы пытаетесь все время работать только с 0-вым партишном изза TopicPartition(topicName, 0), а надо спросить у консюмера какие партишны к нему привязаны:

Set<TopicPartition> topicPartitions = consumer.assignment()

Ахтунг: может вернуть пустое множество, см javadoc:

[...] If topic subscription was used, then this will give the set of topic partitions currently assigned to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the process of getting reassigned).