Как коннектор источника Solace tasks.max подключается к очереди?

Мы реализуем тему Kafka, которая представляет собой кластер из 3 машин. Мы собираемся извлечь из очереди Solace и задаемся вопросом, что делает параметр tasks.max и как он разделяет сообщения. У нас нет доступа к очереди утешения, и мы можем только позвонить и посмотреть, удаляем ли мы сообщения из буфера. любое понимание было бы здорово!! Текущая система в hornetQ и работает 5 потоков.

Окружающая среда:

  • Распределенный режим
  • 1 тема
  • 3 реплики
  • 5 разделов на машину

Solace-kafka-connector-source-master

Мы попробовали его в автономном режиме с одной задачей, и он работает. Поскольку мы не контролируем очередь утешения, мы можем видеть, удаляем ли мы сообщения.

{
    "name": "solaceSourceConnector",
    "config": {
        "connector.class":
"com.solace.source.connector.SolaceSourceConnector",
        "tasks.max": "1",
        "kafka.topic": "solacetest",
        "sol.host": "HOSTNAME",
        "sol.username": "USERNAME",
        "sol.password": "PASSWORD",
        "sol.vpn_name": "VPN IP",
        "sol.topics": "soltest, soltest1,solacetest2",
        "sol.queue": "testQ",
        "sol.message_processor_class": "com.solace.source.connector.msgProcessors.SolaceSampleKeyedMessageProcessor",
        "sol.generate_send_timestamps": "true",
        "sol.generate_rcv_timestamps": "true",
        "sol.sub_ack_window_size": "255",
        "sol.generate_sequence_numbers": "true",
        "sol.calculate_message_expiration": "true",
        "sol.subscriber_dto_override": "true",
        "sol.channel_properties.connect_retries": "-1",
        "sol.channel_properties.reconnect_retries": "-1",
        "sol.kafka_message_key": "DESTINATION",
        "sol.ssl_trust_store": "/opt/PKI/skeltonCA/heinz1.ts",
        "sol.ssl_trust_store_pasword": "sasquatch",
        "sol.ssl_trust_store_format": "JKS",
        "sol.ssl_key_store": "/opt/PKI/skeltonCA/heinz1.ks",
        "sol.ssl_key_store_password": "sasquatch",
        "sol.ssl_key_store_format": "JKS",
        "sol.ssl_key_store_normalized_format": "JKS",
        "sol.ssl_private_key_alias": "heinz1",
        "sol.ssl_private_key_password": "sasquatch"
    }

}

Я пытаюсь сделать так, чтобы мы не теряли сообщения в буфере.


person Geoffrey Jennings    schedule 04.04.2019    source источник


Ответы (1)


max.tasks 1 будет работать с эксклюзивной очередью, поскольку в такой очереди Solace может быть только один активный подписчик. Это также гарантирует сохранение порядка сообщений от Solace до Kafka.

Если Solace Queue является неэксклюзивной (то есть общей) очередью, то увеличение max.tasks определит верхнюю границу количества одновременных клиентов Solace, потребляющих из очереди. Это обеспечивает горизонтальное масштабирование с использованием нескольких потребителей из очереди для повышения пропускной способности. Однако Solace не гарантирует порядок сообщений между несколькими потребителями неэксклюзивной очереди, поэтому порядок сообщений может не сохраниться.

В любом случае сообщения не теряются, потому что они не подтверждаются обратно в Solace и удаляются до тех пор, пока они не будут успешно подтверждены как записанные в Kafka.

person Hans Jespersen    schedule 05.04.2019