У меня одна тема и одна подписка с несколькими подписчиками. Сценарий моего приложения: я хочу обрабатывать сообщения от разных подписчиков с определенным количеством сообщений, которые нужно обрабатывать за раз. Сначала подразумевается, что обрабатывается 8 сообщений, затем, если обработка одного сообщения выполнена, то после подтверждения обработанного сообщения следующее сообщение должно быть взято из темы, при этом не должно быть дублирующихся сообщений ни у одного подписчика, и каждый раз 8 сообщений должны обрабатываться в фоновом режиме.
Для этого я использую метод синхронного извлечения с max_messages = 8, но следующее извлечение выполняется после завершения обработки всех сообщений. Итак, мы создали собственный планировщик, в котором в то же время 8 процессов должны работать в фоновом режиме и вытягивать по одному сообщению за раз, но все же после завершения обработки всех 8 сообщений доставляется следующее сообщение.
Вот мой код:
#!/usr/bin/env python3
import logging
import multiprocessing
import time
import sys
import random
from google.cloud import pubsub_v1
project_id = 'xyz'
subscription_name = 'abc'
NUM_MESSAGES = 4
ACK_DEADLINE = 50
SLEEP_TIME = 20
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
def worker(msg):
logger.info("Received message:{}".format(msg.message.data))
random_sleep = random.randint(200,800)
logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
time.sleep(random_sleep)
def message_puller():
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)
while(True):
try:
response = subscriber.pull(subscription_path, max_messages=1)
message = response.received_messages[0]
msg = message
ack_id = message.ack_id
process = multiprocessing.Process(target=worker, args=(message,))
process.start()
while process.is_alive():
# `ack_deadline_seconds` must be between 10 to 600.
subscriber.modify_ack_deadline(subscription_path,[ack_id],ack_deadline_seconds=ACK_DEADLINE)
time.sleep(SLEEP_TIME)
# Final ack.
subscriber.acknowledge(subscription_path, [ack_id])
logger.info("Acknowledging message: {}".format(msg.message.data))
except Exception as e:
print (e)
continue
def synchronous_pull():
p = []
for i in range(0,NUM_MESSAGES):
p.append(multiprocessing.Process(target=message_puller))
for i in range(0,NUM_MESSAGES):
p[i].start()
for i in range(0,NUM_MESSAGES):
p[i].join()
if __name__ == '__main__':
synchronous_pull()
Также иногда subscriber.pull не извлекает никаких сообщений, даже если цикл while всегда равен True. Это дает мне ошибку, поскольку индекс списка (0) выходит за пределы допустимого диапазона. В заключение, что subscriber.pull не выводит сообщение, даже если сообщения находятся в теме, но через некоторое время он начинает вытягивать. Почему это так?
Я пробовал использовать асинхронное извлечение и управление потоком, но на нескольких подписчиках обнаружены повторяющиеся сообщения. Если какой-либо другой метод решит мою проблему, сообщите об этом mi. Заранее спасибо.