Websphere MQ как источник данных для Apache Spark Streaming

Я изучал возможности Websphere MQ в качестве источника данных для потоковой передачи искр, потому что это необходимо в одном из наших вариантов использования. Я узнал, что MQTT — это протокол, который поддерживает связь из структур данных MQ, но, поскольку я новичок в потоковой передаче Мне нужно несколько рабочих примеров для того же. Кто-нибудь пытался подключить MQ к потоковой передаче искры. Пожалуйста, придумайте лучший способ сделать это.


person tom    schedule 25.05.2015    source источник
comment
Голосование за закрытие как не по теме, поскольку оно не соответствует правилам вопросов Stack Overflow. Я бы предложил задать эти общие вопросы по архитектуре и возможностям на mqseries.net или на одном из других онлайн-форумов MQ.   -  person T.Rob    schedule 25.05.2015
comment
Я думаю, что это может быть просто проблема формулировки. Вместо расплывчатого я изучал эту вещь. Какое решение лучше всего? вы можете задать прямой вопрос. Как читать данные из Websphere MQ через Apache Spark? Если вы знаете больше о стороне вопроса Websphere MQ, вы можете добавить дополнительную информацию об этом. Поддерживает ли он SQL? Как вы обычно запрашиваете его? Какие клиенты существуют для него? Тогда кто-то, кто знает Spark, возможно, сможет вам помочь.   -  person Daniel Darabos    schedule 15.08.2015


Ответы (2)


Итак, я размещаю здесь рабочий код для CustomMQReceiver, который подключается к Websphere MQ и считывает данные:

public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;

Enumeration enumeration =null;

public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
    super(StorageLevel.MEMORY_ONLY_2());
    this.host = host;
    this.port = port;
    this.qm=qm;
    this.qn=qn;
    this.channel=channel;

}

public void onStart() {
    // Start the thread that receives data over a connection
    new Thread()  {
        @Override public void run() {
            try {
                initConnection();
                receive();
            }
            catch (JMSException ex)
            {
                ex.printStackTrace();
            }
        }
    }.start();
}
public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
}

 /** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
  System.out.print("Started receiving messages from MQ");

    try {

    JMSMessage receivedMessage= null;

        while (!isStopped() && enumeration.hasMoreElements() )
        {

            receivedMessage= (JMSMessage) enumeration.nextElement();
            String userInput = convertStreamToString(receivedMessage);
            //System.out.println("Received data :'" + userInput + "'");
            store(userInput);
        }

        // Restart in an attempt to connect again when server is active again
        //restart("Trying to connect again");

        stop("No More Messages To read !");
        qCon.close();
        System.out.println("Queue Connection is Closed");

    }
    catch(Exception e)
    {
        e.printStackTrace();
        restart("Trying to connect again");
    }
    catch(Throwable t) {
        // restart if there is any other error
        restart("Error receiving data", t);
    }
    }

  public void initConnection() throws JMSException
{
    MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
    conFactory.setHostName(host);
    conFactory.setPort(port);
    conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
    conFactory.setQueueManager(qm);
    conFactory.setChannel(channel);


    qCon= (MQQueueConnection) conFactory.createQueueConnection();
    MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
    MQQueue queue=(MQQueue) qSession.createQueue(qn);
    MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
    qCon.start();

    enumeration= browser.getEnumeration();
   }

 @Override
public StorageLevel storageLevel() {
    return StorageLevel.MEMORY_ONLY_2();
}
}
person tom    schedule 18.08.2015
comment
Я реализовал то же самое, но на данный момент только один исполнитель использует сообщение. Знаете ли вы какой-нибудь подход к тому, чтобы несколько исполнителей потребляли сообщения параллельно? А также как вы справляетесь со сценариями отказа? Если потоковое приложение периодически останавливается, как вы можете не потерять сообщения? - person sparker; 21.07.2018
comment
@sparker Я думаю, что в подходе, основанном на приемнике, у вас есть один приемник, а затем создается несколько исполнителей для параллельной обработки полученных данных. Если вам нужна истинная степень параллелизма, я бы выбрал подход Spark + Kafka без приемника (прямой). Обработка неисправностей может выполняться в режиме искрового потока с помощью регулярных контрольных точек. - person tom; 26.09.2018

Я считаю, что вы можете использовать JMS для подключения к Websphere MQ, а Apache Camel можно использовать для подключения к Websphere MQ. Вы можете создать собственный Receiver следующим образом (обратите внимание, что этот шаблон также можно использовать без JMS):

class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable  {
  //Transient as this will get passed to the Workers from the Driver
  @transient
  var camelContextOption: Option[DefaultCamelContext] = None

  def onStart() = {
    camelContextOption = Some(new DefaultCamelContext())
    val camelContext = camelContextOption.get
    val env = new Properties()
    env.setProperty("java.naming.factory.initial", "???")
    env.setProperty("java.naming.provider.url", jndiProviderURL)
    env.setProperty("com.webmethods.jms.clientIDSharing", "true")
    val namingContext = new InitialContext(env);  //using the properties file to create context

    //Lookup Connection Factory
    val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory]
    camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))

    val builder = new RouteBuilder() {
        def configure() = {
          from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10")
            .process(new Processor() {
            def process(exchange: Exchange) = {
              exchange.getIn.getBody match {
                case s: String => store(s)
              }
            }
          })
        }
      }
    }
    builders.foreach(camelContext.addRoutes)
    camelContext.start()
  }

  def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop()
}

Затем вы можете создать DStream ваших событий следующим образом:

val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))
person Patrick McGloin    schedule 18.08.2015
comment
Ответ полезен для меня, но я искал запись результата обратно в Websphere MQ. Может ли кто-нибудь предоставить решение для него. Спасибо - person Darshan Manek; 03.12.2018
comment
Привет @DarshanManek - я думаю, тебе стоит задать новый вопрос или поискать ответ. Это довольно сильно отличается от приемной части. - person Patrick McGloin; 03.12.2018