Я изучал возможности Websphere MQ в качестве источника данных для потоковой передачи искр, потому что это необходимо в одном из наших вариантов использования. Я узнал, что MQTT — это протокол, который поддерживает связь из структур данных MQ, но, поскольку я новичок в потоковой передаче Мне нужно несколько рабочих примеров для того же. Кто-нибудь пытался подключить MQ к потоковой передаче искры. Пожалуйста, придумайте лучший способ сделать это.
Websphere MQ как источник данных для Apache Spark Streaming
comment
Голосование за закрытие как не по теме, поскольку оно не соответствует правилам вопросов Stack Overflow. Я бы предложил задать эти общие вопросы по архитектуре и возможностям на mqseries.net или на одном из других онлайн-форумов MQ.
- person T.Rob   schedule 25.05.2015
comment
Я думаю, что это может быть просто проблема формулировки. Вместо расплывчатого я изучал эту вещь. Какое решение лучше всего? вы можете задать прямой вопрос. Как читать данные из Websphere MQ через Apache Spark? Если вы знаете больше о стороне вопроса Websphere MQ, вы можете добавить дополнительную информацию об этом. Поддерживает ли он SQL? Как вы обычно запрашиваете его? Какие клиенты существуют для него? Тогда кто-то, кто знает Spark, возможно, сможет вам помочь.
- person Daniel Darabos   schedule 15.08.2015
Ответы (2)
Итак, я размещаю здесь рабочий код для CustomMQReceiver, который подключается к Websphere MQ и считывает данные:
public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;
Enumeration enumeration =null;
public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
super(StorageLevel.MEMORY_ONLY_2());
this.host = host;
this.port = port;
this.qm=qm;
this.qn=qn;
this.channel=channel;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
try {
initConnection();
receive();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
System.out.print("Started receiving messages from MQ");
try {
JMSMessage receivedMessage= null;
while (!isStopped() && enumeration.hasMoreElements() )
{
receivedMessage= (JMSMessage) enumeration.nextElement();
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
// Restart in an attempt to connect again when server is active again
//restart("Trying to connect again");
stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");
}
catch(Exception e)
{
e.printStackTrace();
restart("Trying to connect again");
}
catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
public void initConnection() throws JMSException
{
MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
conFactory.setHostName(host);
conFactory.setPort(port);
conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
conFactory.setQueueManager(qm);
conFactory.setChannel(channel);
qCon= (MQQueueConnection) conFactory.createQueueConnection();
MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
MQQueue queue=(MQQueue) qSession.createQueue(qn);
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
qCon.start();
enumeration= browser.getEnumeration();
}
@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}
}
person
tom
schedule
18.08.2015
Я реализовал то же самое, но на данный момент только один исполнитель использует сообщение. Знаете ли вы какой-нибудь подход к тому, чтобы несколько исполнителей потребляли сообщения параллельно? А также как вы справляетесь со сценариями отказа? Если потоковое приложение периодически останавливается, как вы можете не потерять сообщения?
- person sparker; 21.07.2018
@sparker Я думаю, что в подходе, основанном на приемнике, у вас есть один приемник, а затем создается несколько исполнителей для параллельной обработки полученных данных. Если вам нужна истинная степень параллелизма, я бы выбрал подход Spark + Kafka без приемника (прямой). Обработка неисправностей может выполняться в режиме искрового потока с помощью регулярных контрольных точек.
- person tom; 26.09.2018
Я считаю, что вы можете использовать JMS для подключения к Websphere MQ, а Apache Camel можно использовать для подключения к Websphere MQ. Вы можете создать собственный Receiver следующим образом (обратите внимание, что этот шаблон также можно использовать без JMS):
class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable {
//Transient as this will get passed to the Workers from the Driver
@transient
var camelContextOption: Option[DefaultCamelContext] = None
def onStart() = {
camelContextOption = Some(new DefaultCamelContext())
val camelContext = camelContextOption.get
val env = new Properties()
env.setProperty("java.naming.factory.initial", "???")
env.setProperty("java.naming.provider.url", jndiProviderURL)
env.setProperty("com.webmethods.jms.clientIDSharing", "true")
val namingContext = new InitialContext(env); //using the properties file to create context
//Lookup Connection Factory
val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory]
camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
val builder = new RouteBuilder() {
def configure() = {
from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10")
.process(new Processor() {
def process(exchange: Exchange) = {
exchange.getIn.getBody match {
case s: String => store(s)
}
}
})
}
}
}
builders.foreach(camelContext.addRoutes)
camelContext.start()
}
def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop()
}
Затем вы можете создать DStream ваших событий следующим образом:
val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))
person
Patrick McGloin
schedule
18.08.2015
Ответ полезен для меня, но я искал запись результата обратно в Websphere MQ. Может ли кто-нибудь предоставить решение для него. Спасибо
- person Darshan Manek; 03.12.2018
Привет @DarshanManek - я думаю, тебе стоит задать новый вопрос или поискать ответ. Это довольно сильно отличается от приемной части.
- person Patrick McGloin; 03.12.2018