Как получить сообщения из топика Kafka?

Рейтинг: 1Ответов: 1Опубликовано: 07.08.2023

Написал типичную функцию для получения и обработки сообщений из топика Kafka. В качестве инструментов использую язык Python и модуль Confluent Kafka. При использовании метода poll() одного из consumer'ов возвращаемое значение всегда None. Реализация представлена ниже:

def consume_from_kafka_topic(topic):
    conf = {
        'bootstrap.servers': 'localhost:9092',  
        'group.id': 'my_consumer_group', 
        'auto.offset.reset': 'earliest'
    }

    consumer = Consumer(conf)
    consumer.subscribe([topic])  # Подписываемся на указанный топик

    try:
        while True:
            msg = consumer.poll(1.0)  # Получаем сообщение из топика
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print(f'Ошибка при получении сообщения: {msg.error().str()}')
                    break
            else:
                value = msg.value().decode('utf-8')  # Декодируем значение сообщения
                print(f'Получено сообщение: {value}')

    except KeyboardInterrupt:
        pass

    finally:
        consumer.close()

В логах ошибок нет, к брокеру подключение есть, так как отправка сообщений через producer'ов работает. Через kafdrop в топике отображаются 14 сообщений. Где была допущена ошибка и в чем может быть проблема?

Ответы

▲ 1

Как оказалось, метод poll() считывает сообщения, отправленные только после момента создания consumer'a. Для считывания всех сообщений из топика пришлось вручную прописать смещение (offset) топика на 0. Добавив следующий код перед бесконечным циклом, мне удалось получить желаемый результат:

# Создаем объект TopicPartition с указанием топика, раздела и смещения
tp = TopicPartition(topic, partition, offset)

# Устанавливаем смещение для указанного топика и раздела
consumer.assign([tp])