Синхронное извлечение Google Cloud pubsub python

У меня одна тема и одна подписка с несколькими подписчиками. Сценарий моего приложения: я хочу обрабатывать сообщения от разных подписчиков с определенным количеством сообщений, которые нужно обрабатывать за раз. Сначала подразумевается, что обрабатывается 8 сообщений, затем, если обработка одного сообщения выполнена, то после подтверждения обработанного сообщения следующее сообщение должно быть взято из темы, при этом не должно быть дублирующихся сообщений ни у одного подписчика, и каждый раз 8 сообщений должны обрабатываться в фоновом режиме.

Для этого я использую метод синхронного извлечения с max_messages = 8, но следующее извлечение выполняется после завершения обработки всех сообщений. Итак, мы создали собственный планировщик, в котором в то же время 8 процессов должны работать в фоновом режиме и вытягивать по одному сообщению за раз, но все же после завершения обработки всех 8 сообщений доставляется следующее сообщение.

Вот мой код:

    #!/usr/bin/env python3

    import logging
    import multiprocessing
    import time
    import sys
    import random
    from google.cloud import pubsub_v1

    project_id = 'xyz'
    subscription_name = 'abc'

    NUM_MESSAGES = 4
    ACK_DEADLINE = 50
    SLEEP_TIME = 20

    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    def worker(msg):
        logger.info("Received message:{}".format(msg.message.data))
        random_sleep = random.randint(200,800)
        logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
        time.sleep(random_sleep)

    def message_puller():
        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(project_id, subscription_name)
        while(True):
            try:
                response = subscriber.pull(subscription_path, max_messages=1)
                message = response.received_messages[0]
                msg = message
                ack_id = message.ack_id
                process = multiprocessing.Process(target=worker, args=(message,))
                process.start()
                while process.is_alive():
                    # `ack_deadline_seconds` must be between 10 to 600.
                    subscriber.modify_ack_deadline(subscription_path,[ack_id],ack_deadline_seconds=ACK_DEADLINE)
                    time.sleep(SLEEP_TIME)
                # Final ack.
                subscriber.acknowledge(subscription_path, [ack_id])
                logger.info("Acknowledging message: {}".format(msg.message.data))
    except Exception as e:
        print (e)
        continue

    def synchronous_pull():
        p = []
        for i in range(0,NUM_MESSAGES):
            p.append(multiprocessing.Process(target=message_puller))

        for i in range(0,NUM_MESSAGES):
            p[i].start()

        for i in range(0,NUM_MESSAGES):
            p[i].join()

    if __name__ == '__main__':
        synchronous_pull()

Также иногда subscriber.pull не извлекает никаких сообщений, даже если цикл while всегда равен True. Это дает мне ошибку, поскольку индекс списка (0) выходит за пределы допустимого диапазона. В заключение, что subscriber.pull не выводит сообщение, даже если сообщения находятся в теме, но через некоторое время он начинает вытягивать. Почему это так?

Я пробовал использовать асинхронное извлечение и управление потоком, но на нескольких подписчиках обнаружены повторяющиеся сообщения. Если какой-либо другой метод решит мою проблему, сообщите об этом mi. Заранее спасибо.


person Pradnya Shinde    schedule 10.10.2019    source источник
comment
Google PubSub гарантирует, что каждое сообщение будет доставлено хотя бы один раз. Бывают случаи, когда вы будете получать одни и те же сообщения более одного раза. В этом случае ваша программа должна демонстрировать идемпотентность stackoverflow.com / questions / 1077412 /.   -  person saintlyzero    schedule 10.10.2019


Ответы (1)


Google Cloud PubSub обеспечивает хотя бы один раз (документы). Это означает, что сообщения могут быть доставлены более одного раза. Чтобы решить эту проблему, вам необходимо сделать свою программу / систему идемпотентной

У вас есть несколько подписчиков, каждый из которых извлекает по 8 сообщений.
Чтобы избежать обработки одного и того же сообщения несколькими подписчиками, acknowledge сообщение, как только какой-либо подписчик извлечет это сообщение, и продолжит обработку, а не подтвердит его в конце, после всего обработка сообщения.

Кроме того, вместо непрерывного запуска основного сценария используйте sleep в течение некоторого постоянного времени, когда в очереди нет сообщений.

У меня был похожий код, в котором я использовал синхронное извлечение, за исключением того, что я не использовал параллельную обработку.

Вот код:

PubSubHandler - класс для обработки операций, связанных с Pubsub.

from google.cloud import pubsub_v1
from google.api_core.exceptions import DeadlineExceeded


class PubSubHandler:

    def __init__(self, subscriber_config):

        self.project_name = subscriber_config['PROJECT_NAME']
        self.subscriber_name = subscriber_config['SUBSCRIBER_NAME']

        self.subscriber = pubsub_v1.SubscriberClient()
        self.subscriber_path = self.subscriber.subscription_path(self.project_name,self.subscriber_name)


    def pull_messages(self,number_of_messages):

        try:
            response = self.subscriber.pull(self.subscriber_path, max_messages = number_of_messages)
            received_messages = response.received_messages
        except DeadlineExceeded as e:
            received_messages = []
            print('No messages caused error')
        return received_messages


    def ack_messages(self,message_ids):

        if len(message_ids) > 0:
            self.subscriber.acknowledge(self.subscriber_path, message_ids)
            return True

Utils - класс для служебных методов.

import json

class Utils:


    def __init__(self):
        pass


    def decoded_data_to_json(self,decoded_data):
        try:
            decoded_data = decoded_data.replace("'", '"')
            json_data = json.loads(decoded_data)
            return json_data
        except Exception as e:
            raise Exception('error while parsing json')


    def raw_data_to_utf(self,raw_data):
        try:
            decoded_data = raw_data.decode('utf8')
            return decoded_data
        except Exception as e:
            raise Exception('error converting to UTF')

Orcestrator - основной скрипт


import time
import json
import logging

from utils import Utils
from db_connection import DbHandler
from pub_sub_handler import PubSubHandler

class Orcestrator:

    def __init__(self):

        self.MAX_NUM_MESSAGES = 2
        self.SLEEP_TIME = 10
        self.util_methods = Utils()
        self.pub_sub_handler = PubSubHandler(subscriber_config)


    def main_handler(self):
        to_ack_ids = []
        pulled_messages = self.pub_sub_handler.pull_messages(self.MAX_NUM_MESSAGES)

        if len(pulled_messages) < 1:
            self.SLEEP_TIME = 1
            print('no messages in queue')
            return

        logging.info('messages in queue')
        self.SLEEP_TIME = 10

        for message in pulled_messages:
            raw_data = message.message.data
            try: 
                decoded_data = self.util_methods.raw_data_to_utf(raw_data)  
                json_data = self.util_methods.decoded_data_to_json(decoded_data)
                print(json_data)

            except Exception as e:
                logging.error(e)
            to_ack_ids.append(message.ack_id)

        if self.pub_sub_handler.ack_messages(to_ack_ids):
            print('acknowledged msg_ids')


if __name__ == "__main__":

    orecestrator = Orcestrator()
    print('Receiving data..')
    while True:
        orecestrator.main_handler()
        time.sleep(orecestrator.SLEEP_TIME)

person saintlyzero    schedule 10.10.2019
comment
Спасибо за ответ. Но в моем сценарии по теме около 100 сообщений, а время обработки каждого сообщения составляет от 4 до 5 часов. 1. Если я сначала подтверждаю, не обрабатывая, тогда следующие сообщения появятся у подписчика, также если какой-либо процесс выйдет из строя, тогда мы хотим, чтобы оно снова было обработано на следующем или том же подписчике. 2. Если процесс одного сообщения завершен, то по словам, мы должны вывести следующее сообщение. Так что в идеале обработка 8 сообщений будет выполняться одновременно. - person Pradnya Shinde; 10.10.2019
comment
@PradnyaShinde Если вы не подтверждаете сообщение перед его обработкой, то другой подписчик определенно поймает это же сообщение и начнет его обрабатывать. Чтобы избежать этого, вы можете сделать: 1. Fetch new message - ›2. Store the message (локально или в базе данных) -› 3. Ack message - ›4. Process message. Теперь, если процесс завершился неудачно, восстановите сохраненное сообщение и снова обработайте его или опубликуйте то же сообщение в очереди, чтобы оно могло быть выполнено другим подписчиком. - person saintlyzero; 10.10.2019
comment
Полностью согласен. Но согласно документу говорится, что если один подписчик вытащит сообщение, то другой подписчик не потянет его до тех пор, пока его ack_deadline не закончится. Итак, я использовал концепцию modify_ack_deadline, проверяя, запущен ли процесс или нет. В моем коде проблема с дублированием сообщения на нескольких подписчиках решена, но невозможно получить следующее сообщение, если один процесс завершен. Также я хочу понять, подходит ли для моего сценария управление потоком или нет? - person Pradnya Shinde; 11.10.2019
comment
@PradnyaShinde Да, расширение ack_deadline остановит повторную отправку того же сообщения. Но, я думаю, есть предел для дальнейшего расширения ack_deadline. Я не уверен, почему вы не можете вывести следующее сообщение после завершения процесса. - person saintlyzero; 12.10.2019
comment
@PradnyaShinde Мой подход к этой проблеме: 1. Вы можете использовать сельдерей, который инициализирует пул рабочих python и назначит сообщения каждому рабочему в зависимости от количества сообщений в очереди. Сельдерей позаботится о том, чтобы сообщение не осталось необработанным. - person saintlyzero; 12.10.2019
comment
@PradnyaShinde 2. Не выполняйте многопроцессорную обработку, вместо этого запустите несколько экземпляров вашей программы, таким образом вы сможете разделить обработку. Если родительский процесс выйдет из строя, он не откажется от всех своих дочерних процессов. Кроме того, подтвердите сообщение, как только вы начнете обработку, сохраните сообщение в каком-либо месте на диске, в случае любого сбоя повторно вставьте сообщение в очередь с диска, а также сохраните состояния процесса в БД. - person saintlyzero; 12.10.2019
comment
Спасибо за ответ. Несколько раз я наблюдал разное поведение pubsub. До тех пор, пока в одном экземпляре сообщение не обрабатывается, другой экземпляр не сможет извлечь следующее сообщение. Как эти два экземпляра зависят друг от друга? Срок подписки - 10 сек. - person Pradnya Shinde; 21.10.2019