Я использую буфер протокола для отправки потока данных в Apache Flink. У меня два класса. один - производитель, а другой - потребитель. Producer - это класс потока Java, который считывает данные из сокета, и Protobuf десериализует их, а затем я сохраняю его в своем BlockingQueue. Consumer - это класс, который реализует SourceFunction во Flink. Я тестировал эту программу с использованием:
DataStream<Event.MyEvent> stream = env.fromCollection(queue);
вместо настраиваемого источника, и он отлично работает. Но когда я пытаюсь использовать класс SourceFunction, он выдает следующее исключение:
Caused by: java.lang.RuntimeException: Unable to find proto buffer class
at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
...
Caused by: java.lang.ClassNotFoundException: event.Event$MyEvent
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
...
И в другой попытке я смешал оба класса в один (класс, реализующий SourceFunction). Я получаю данные из сокета, десериализирую их с помощью protobuf и сохраняю их в BlockingQueue, а затем сразу после этого читаю из BlockingQueue. Мой код тоже отлично работает с этим подходом.
Но я хочу использовать два отдельных класса (многопоточность), но это вызывает исключение. Я пытаюсь решить эту проблему за последние 2 дня, а также много искал, но безуспешно. Любая помощь будет приложена.
Режиссер:
public class Producer implements Runnable {
Boolean running = true;
Socket socket = null, bufferSocket = null;
PrintStream ps = null;
BlockingQueue<Event.MyEvent> queue;
final int port;
public Producer(BlockingQueue<Event.MyEvent> queue, int port){
this.port = port;
this.queue = queue;
}
@Override
public void run() {
try {
socket = new Socket("127.0.0.1", port);
bufferSocket = new Socket(InetAddress.getLocalHost(), 6060);
ps = new PrintStream(bufferSocket.getOutputStream());
while (running) {
queue.put(Event.MyEvent.parseDelimitedFrom(socket.getInputStream()));
ps.println("Items in Queue: " + queue.size());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
Потребитель:
public class Consumer implements SourceFunction<Event.MyEvent> {
Boolean running = true;
BlockingQueue<Event.MyEvent> queue;
Event.MyEvent event;
public Consumer(BlockingQueue<Event.MyEvent> queue){
this.queue = queue;
}
@Override
public void run(SourceContext<Event.MyEvent> sourceContext) {
try {
while (running) {
event = queue.take();
sourceContext.collect(event);
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void cancel() {
running = false;
}
}
Event.MyEvent - мой класс protobuf. Я использую версию 2.6.1, и я скомпилировал классы с v2.6.1. Я дважды проверил версии, чтобы убедиться, что проблема не в этом. Класс Producer работает нормально. Я тестировал это как с Flink v1.1.3, так и с v1.1.4. Я запускаю его в локальном режиме.
РЕДАКТИРОВАТЬ: ответ был включен в вопрос, опубликован отдельно и удален здесь.
ОБНОВЛЕНИЕ 28 декабря 2016 г.
... Но мне все еще любопытно. Что вызывает эту ошибку? Это ошибка во Flink или я что-то делаю не так?
...