ZeroMQ push/pull и поток чтения nodejs

Я пытаюсь прочитать какой-то файл, открыв поток чтения и отправив фрагменты файла через ZMQ другому процессу для их использования. Поток работает так, как должен, однако, когда я запускаю работника, он не видит отправленные данные.

Я пытался отправлять данные через сокет каждые 500 мс, а не в обратном вызове, и когда я запускаю работника, он собирает все предыдущие фрагменты данных:

sender = zmq.socket('push')
setInterval(() ->
  console.log('sending work');
  sender.send('some work')
, 500)

receiver = zmq.socket("pull")
receiver.on "message", (msg) ->
  console.log('work is here: %s', msg.toString())

Выходы:

sending work
sending work
sending work
sending work
sending work
// here I start the worker
sending work
work is here: some work
work is here: some work
work is here: some work
work is here: some work
work is here: some work
work is here: some work
sending work
work is here: some work
sending work
work is here: some work
sending work
work is here: some work

Итак, когда рабочий запускается, он начинает с извлечения всех предыдущих данных, а затем извлекает их каждый раз, когда поступает что-то новое. Это не применяется, когда я делаю это:

readStream = fs.createReadStream("./data/pg2701.txt", {'bufferSize': 100 * 1024})
readStream.on "data", (data) ->
  console.log('sending work');
  sender.send('some work'); // I'd send 'data' if it worked..

В этом сценарии рабочий процесс вообще не извлекает никаких данных. Должны ли такие сокеты создавать очередь или нет? Что мне здесь не хватает?


person tupaja    schedule 07.11.2012    source источник
comment
Какие-нибудь сообщения об ошибках, или это просто не работает?   -  person floatingLomas    schedule 08.11.2012
comment
он просто не работает, воркер запускается правильно, все выглядит так, как будто это нормальное поведение.. не так ли?   -  person tupaja    schedule 08.11.2012


Ответы (1)


Да, push-сокет блокируется до тех пор, пока не будет достигнут HWM, а отправлять некому. Возможно, отправитель еще не привязался, попробуйте что-то вроде этого:

sender.bind('address', function(err) {
  if (err) throw err;
  console.log('sender bound!');

  // the readStream code.
}

также в вашем примере кода отсутствует connect, я уверен, что он там, но, возможно, вы его забыли.

person balazs    schedule 08.11.2012