Как получить сообщения из топика Kafka?
Написал типичную функцию для получения и обработки сообщений из топика Kafka. В качестве инструментов использую язык Python и модуль Confluent Kafka. При использовании метода poll() одного из consumer'ов возвращаемое значение всегда None. Реализация представлена ниже:
def consume_from_kafka_topic(topic):
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_consumer_group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe([topic]) # Подписываемся на указанный топик
try:
while True:
msg = consumer.poll(1.0) # Получаем сообщение из топика
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f'Ошибка при получении сообщения: {msg.error().str()}')
break
else:
value = msg.value().decode('utf-8') # Декодируем значение сообщения
print(f'Получено сообщение: {value}')
except KeyboardInterrupt:
pass
finally:
consumer.close()
В логах ошибок нет, к брокеру подключение есть, так как отправка сообщений через producer'ов работает. Через kafdrop в топике отображаются 14 сообщений. Где была допущена ошибка и в чем может быть проблема?