Я использую группу потребителей pykafka с gevent, но результаты имеют повторяющиеся данные. показать мой код:
import gevent
from pykafka import KafkaClient
topic_name = 'test2'
bootstrap_servers = '192.168.199.228:9094,192.168.199.228:9092,192.168.199.228:9093'
group = 'test_g'
def get_consumer():
client = KafkaClient(hosts=bootstrap_servers, use_greenlets=True)
topic = client.topics[topic_name.encode()]
consumer = topic.get_simple_consumer(auto_commit_interval_ms=10000,
consumer_group=group.encode(),
auto_commit_enable=True,
)
return consumer
def worker(worker_id):
consumer = get_consumer()
for msg in consumer:
print('worker {} partition: {}, offset: {}'.format(worker_id, msg.partition, msg.offset))
if __name__ == '__main__':
tasks = [gevent.spawn(worker, *(i, )) for i in range(3)]
ret = gevent.joinall(tasks)
reulst: Кто-нибудь может сказать мне, как заставить его работать, разве pykafka не поддерживает gevent?