Передача данных от дочернего к родительскому в nodejs

У меня есть родительский процесс nodejs, который запускает другой дочерний процесс nodejs. Дочерний процесс выполняет некоторую логику, а затем возвращает результат родительскому процессу. Вывод большой, и я пытаюсь использовать каналы для связи, как это предлагается в документации для метода child.send() (который, кстати, отлично работает).

Хотелось бы, чтобы кто-нибудь подсказал, как правильно построить этот канал связи. Я хочу иметь возможность отправлять данные от родителя к ребенку, а также иметь возможность отправлять данные от ребенка к родителю. Я немного запустил его, но он неполный (отправляет сообщение только от родителя к дочернему) и выдает ошибку.

Код родительского файла:

var child_process = require('child_process');

var opts = {
    stdio: [process.stdin, process.stdout, process.stderr, 'pipe']
};
var child = child_process.spawn('node', ['./b.js'], opts);

require('streamifier').createReadStream('test 2').pipe(child.stdio[3]);

Код дочернего файла:

var fs =  require('fs');

// read from it
var readable = fs.createReadStream(null, {fd: 3});

var chunks = []; 

readable.on('data', function(chunk) {
    chunks.push(chunk);
});

readable.on('end', function() {
    console.log(chunks.join().toString());
})

Приведенный выше код выводит ожидаемый результат («тест 2») вместе со следующей ошибкой:

events.js:85
      throw er; // Unhandled 'error' event
            ^
Error: shutdown ENOTCONN
    at exports._errnoException (util.js:746:11)
    at Socket.onSocketFinish (net.js:232:26)
    at Socket.emit (events.js:129:20)
    at finishMaybe (_stream_writable.js:484:14)
    at afterWrite (_stream_writable.js:362:3)
    at _stream_writable.js:349:9
    at process._tickCallback (node.js:355:11)
    at Function.Module.runMain (module.js:503:11)
    at startup (node.js:129:16)
    at node.js:814:3

Полный ответ:

Родительский код:

var child_process = require('child_process');

var opts = {
    stdio: [process.stdin, process.stdout, process.stderr, 'pipe', 'pipe']
};
var child = child_process.spawn('node', ['./b.js'], opts);

child.stdio[3].write('First message.\n', 'utf8', function() {
    child.stdio[3].write('Second message.\n', 'utf8', function() {

    });
}); 

child.stdio[4].pipe(process.stdout);

Детский код:

var fs =  require('fs');

// read from it
var readable = fs.createReadStream(null, {fd: 3});

readable.pipe(process.stdout);
fs.createWriteStream(null, {fd: 4}).write('Sending a message back.');

person Tomasz Rakowski    schedule 09.03.2015    source источник
comment
Можете ли вы включить ошибку, которую он выдает, пожалуйста?   -  person Tom Hallam    schedule 09.03.2015


Ответы (3)


Ваш код работает, но при использовании пакета streamifier для создания потока чтения из строки ваш канал связи автоматически закрывается после передачи этой строки, из-за чего вы получаете ошибку ENOTCONN.

Чтобы иметь возможность отправлять несколько сообщений по потоку, рассмотрите возможность использования в нем .write. Вы можете вызывать это так часто, как хотите:

child.stdio[3].write('First message.\n');
child.stdio[3].write('Second message.\n');

Если вы хотите использовать этот метод для отправки нескольких отдельных сообщений (что, как я полагаю, имеет место на основе вашего замечания об использовании child.send() ранее), рекомендуется использовать какой-либо символ-разделитель, чтобы иметь возможность разделить сообщения при чтении потока в ребенке. В приведенном выше примере я использовал для этого символы новой строки. Полезным пакетом для помощи в этом разделении является event-stream.

Теперь, чтобы создать еще один канал связи из дочернего в родительский, просто добавьте еще один «канал» в свой stdio.

Вы можете написать ему в дочернем:

fs.createWriteStream(null, {fd: 4}).write('Sending a message back.');

И читаем из него в родительском:

child.stdio[4].pipe(process.stdout);

Это напечатает «Отправка сообщения обратно». к консоли.

person Jasper Woudenberg    schedule 09.03.2015
comment
Спасибо за ваш вклад. Однако я хотел бы иметь возможность общаться в обоих направлениях и, возможно, несколько раз. Я бы предпочел использовать stdout/stderr для обработки других типов данных, поэтому я использовал там «канал». - person Tomasz Rakowski; 09.03.2015
comment
Я полностью переписал свой ответ, чтобы он лучше соответствовал заданному вами вопросу. Дайте мне знать, если это ответит на ваш вопрос :). - person Jasper Woudenberg; 10.03.2015
comment
Еще раз спасибо за вклад. Это помогло мне решить мою проблему! - person Tomasz Rakowski; 11.03.2015

Я столкнулся с той же проблемой и использовал параметр {end:false}, чтобы исправить ошибку. К сожалению, принятый ответ работает только при обработке дискретной записи коротких объемов данных. Если у вас много данных (а не только короткие сообщения), вам нужно управлять потоком, и использование .write() не самое лучшее. Для подобных сценариев (передача больших объемов данных) лучше использовать функцию .pipe(), как изначально в вашем коде, для управления потоком.

Ошибка возникает из-за того, что доступный для чтения поток в вашем родительском процессе пытается завершить и закрыть доступный для записи поток ввода вашего дочернего процесса. Вы должны использовать опцию {end: false} в родительском канале процесса:

Исходный код: require('streamifier').createReadStream('test 2').pipe(child.stdio[3]);

Предлагаемая модификация: require('streamifier').createReadStream('test 2').pipe(child.stdio[3], {end:false});

См. подробности здесь из документации NodeJs: https://nodejs.org/dist/latest-v5.x/docs/api/stream.html#stream_readable_pipe_destination_options

Надеюсь, это поможет кому-то еще столкнуться с этой проблемой.

person Ashish    schedule 17.02.2016
comment
Спасибо за этот вклад. Я проверю это позже, и если это сработает, я отмечу ваш ответ как принятый! - person Tomasz Rakowski; 17.02.2016

Вы можете сделать это с помощью fork()

Я только что решил это для себя ... fork() - это версия спавна более высокого уровня, и в целом рекомендуется использовать fork() вместо spawn().

если вы используете опцию {silent:true}, stdio будет передаваться родительскому процессу

          const cp = require('child_process');

          const n = cp.fork(<path>, args, {
              cwd: path.resolve(__dirname),
              detached: true,
           });

          n.stdout.setEncoding('utf8');

          // here we can listen to the stream of data coming from the child process:
          n.stdout.on('data', (data) => {
            ee.emit('data',data);
          });

          //you can also listen to other events emitted by the child process
          n.on('error', function (err) {
            console.error(err.stack);
            ee.emit('error', err);
          });

          n.on('message', function (msg) {
            ee.emit('message', msg);
          });

          n.on('uncaughtException', function (err) {
            console.error(err.stack);
            ee.emit('error', err);
          });


          n.once('exit', function (err) {
             console.error(err.stack);
             ee.emit('exit', err);
          });
person Alexander Mills    schedule 13.01.2016