Достать сообщение из kafka по timestamp
Пытаюсь вычитать сообщение по timestamp. На методе .seek вылетает исключение "No current assignment for partition...". consumer подписывается на топик методом .subscribe() в consumerFactory. long offset находит хорошо. В чем может быть проблема?
KafkaConsumer<String, String> consumer = consumerFactory.getConsumer(topicName);
Map<TopicPartition, Long> timestamps = new HashMap<>();
TopicPartition topicPartition = new TopicPartition(topicName, 0);
timestamps.put(new TopicPartition(topicName, 0), targetTimestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
//offsets такой {chat_68fa03b2-39cb-4fce-ad27-4e1a9276b453-0=(timestamp=1692192922209, leaderEpoch=0, offset=128)}
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
if (offsetAndTimestamp != null) {
long offset = offsetAndTimestamp.offset();
consumer.seek(topicPartition, offset); //тут java.lang.IllegalStateException: No current assignment for partition chat_68fa03b2-39cb-4fce-ad27-4e1a9276b453-0
} else {
}
getConsumer выглядит так:
public KafkaConsumer getConsumer(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerUrl);
props.put("group.id", topicName);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
return consumer;
}