Как использовать потребителя группы pykafka с gevent?

Я использую группу потребителей pykafka с gevent, но результаты имеют повторяющиеся данные. показать мой код:

import gevent
from pykafka import KafkaClient

topic_name = 'test2'
bootstrap_servers = '192.168.199.228:9094,192.168.199.228:9092,192.168.199.228:9093'
group = 'test_g'


def get_consumer():
    client = KafkaClient(hosts=bootstrap_servers, use_greenlets=True)
    topic = client.topics[topic_name.encode()]

    consumer = topic.get_simple_consumer(auto_commit_interval_ms=10000,
                                     consumer_group=group.encode(),
                                     auto_commit_enable=True,
                                     )
    return consumer


def worker(worker_id):
    consumer = get_consumer()
    for msg in consumer:
        print('worker {} partition: {}, offset: {}'.format(worker_id, msg.partition, msg.offset))


if __name__ == '__main__':
    tasks = [gevent.spawn(worker, *(i, )) for i in range(3)]
    ret = gevent.joinall(tasks)

reulst: Кто-нибудь может сказать мне, как заставить его работать, разве pykafka не поддерживает gevent?


person eagle A    schedule 28.10.2017    source источник
comment
Можете ли вы уточнить, что вы подразумеваете под повторяющимися данными результатов? Pykafka поддерживает gevent, но его поддержка недостаточно протестирована, поэтому есть некоторые проблемы.   -  person Emmett Butler    schedule 28.02.2018
comment
@EmmettJ.Butler Я имею в виду, что каждая сопрограмма получает одни и те же данные.   -  person eagle A    schedule 13.03.2018


Ответы (1)


Держу пари, что эта проблема не имеет ничего общего с использованием вами gevent. Причина, по которой вы замечаете дублирование данных у потребителей, заключается в том, что вы используете SimpleConsumer вместо BalancedConsumer. SimpleConsumer не выполняет автоматическую балансировку — она просто потребляет всю тему с ее начального смещения. Таким образом, если у вас есть много экземпляров SimpleConsumer, работающих параллельно, как здесь, каждый из них будет потреблять всю тему со своего начального смещения. BalancedConsumer (topic.get_balanced_consumer(consumer_group='mygroup')), вероятно, то, что вам здесь нужно. Он использует алгоритм повторной балансировки потребителей, чтобы гарантировать, что потребители, работающие в одной и той же группе, не получают одни и те же сообщения. Чтобы это работало, ваша тема должна иметь как минимум столько же разделов, сколько процессов, которые вы ее используете. См. README pykafka и документацию для получения дополнительной информации.

person Emmett Butler    schedule 14.03.2018