Предаване на данни от дете към родител в nodejs

Имам родителски процес на nodejs, който стартира друг дъщерен процес на nodejs. Дъщерният процес изпълнява някаква логика и след това връща резултата на родителя. Резултатът е голям и се опитвам да използвам канали за комуникация, както е предложено в документацията за метода child.send() (който между другото работи добре).

Бих искал някой да предложи как правилно да изградя този комуникационен канал. Искам да мога да изпращам данни от родител на дете, както и да мога да изпращам данни от дете на родител. Започнах го малко, но е непълен (изпраща съобщение само от родител към дете) и извежда грешка.

Код на родителския файл:

var child_process = require('child_process');

var opts = {
    stdio: [process.stdin, process.stdout, process.stderr, 'pipe']
};
var child = child_process.spawn('node', ['./b.js'], opts);

require('streamifier').createReadStream('test 2').pipe(child.stdio[3]);

Код на дъщерен файл:

var fs =  require('fs');

// read from it
var readable = fs.createReadStream(null, {fd: 3});

var chunks = []; 

readable.on('data', function(chunk) {
    chunks.push(chunk);
});

readable.on('end', function() {
    console.log(chunks.join().toString());
})

Горният код отпечатва очаквания резултат ("тест 2") заедно със следната грешка:

events.js:85
      throw er; // Unhandled 'error' event
            ^
Error: shutdown ENOTCONN
    at exports._errnoException (util.js:746:11)
    at Socket.onSocketFinish (net.js:232:26)
    at Socket.emit (events.js:129:20)
    at finishMaybe (_stream_writable.js:484:14)
    at afterWrite (_stream_writable.js:362:3)
    at _stream_writable.js:349:9
    at process._tickCallback (node.js:355:11)
    at Function.Module.runMain (module.js:503:11)
    at startup (node.js:129:16)
    at node.js:814:3

Пълен отговор:

Код на родител:

var child_process = require('child_process');

var opts = {
    stdio: [process.stdin, process.stdout, process.stderr, 'pipe', 'pipe']
};
var child = child_process.spawn('node', ['./b.js'], opts);

child.stdio[3].write('First message.\n', 'utf8', function() {
    child.stdio[3].write('Second message.\n', 'utf8', function() {

    });
}); 

child.stdio[4].pipe(process.stdout);

Код на детето:

var fs =  require('fs');

// read from it
var readable = fs.createReadStream(null, {fd: 3});

readable.pipe(process.stdout);
fs.createWriteStream(null, {fd: 4}).write('Sending a message back.');

person Tomasz Rakowski    schedule 09.03.2015    source източник
comment
Можете ли да включите грешката, която хвърля, моля?   -  person Tom Hallam    schedule 09.03.2015


Отговори (3)


Вашият код работи, но като използвате пакета streamifier за създаване на поток за четене от низ, вашият комуникационен канал се затваря автоматично след предаването на този низ, което е причината да получите грешка ENOTCONN.

За да можете да изпращате множество съобщения през потока, обмислете използването на .write в него. Можете да наричате това толкова често, колкото желаете:

child.stdio[3].write('First message.\n');
child.stdio[3].write('Second message.\n');

Ако искате да използвате този метод за изпращане на множество отделни съобщения (което според мен е така въз основа на вашата забележка за използване на child.send() преди), е добра идея да използвате някакъв разделителен символ, за да можете да разделяте съобщенията, когато потокът се чете в детето. В горния пример използвах нови редове за това. Полезен пакет за подпомагане на това разделяне е event-stream.

Сега, за да създадете друг комуникационен канал от детето в родителя, просто добавете още един 'pipe' към вашето stdio.

Можете да му напишете в детето:

fs.createWriteStream(null, {fd: 4}).write('Sending a message back.');

И прочетете от него в родителя:

child.stdio[4].pipe(process.stdout);

Това ще отпечата „Изпращане на обратно съобщение“. към конзолата.

person Jasper Woudenberg    schedule 09.03.2015
comment
Благодаря ви за вашия принос. Бих искал обаче да мога да комуникирам и в двете посоки и евентуално многократно. Бих предпочел да използвам stdout/stderr за обработка на друг тип данни, затова използвах „тръбата“ там. - person Tomasz Rakowski; 09.03.2015
comment
Напълно пренаписах отговора си, за да отговаря по-добре на въпроса, който зададохте. Кажете ми дали това отговаря на въпроса ви :). - person Jasper Woudenberg; 10.03.2015
comment
Благодаря ви отново за приноса. Това ми помогна да реша проблема си! - person Tomasz Rakowski; 11.03.2015

Сблъсках се със същия проблем и използвах опцията {end:false}, за да коригирам грешката. За съжаление приетият отговор работи само при обработка на дискретни записи на кратки количества данни. В случай, че имате много данни (а не само кратки съобщения), трябва да се справите с контрола на потока и използването на .write() не е най-доброто. За сценарии като този (големи трансфери на данни) е по-добре да използвате функцията .pipe(), както е първоначално във вашия код, за да управлявате контрола на потока.

Грешката се появява, защото четливият поток във вашия родителски процес се опитва да прекрати и затвори записваемата входна тръба на потока на вашия дъщерен процес. Трябва да използвате опцията {end: false} в канала на родителския процес:

Оригинален код: require('streamifier').createReadStream('test 2').pipe(child.stdio[3]);

Предложена модификация: require('streamifier').createReadStream('test 2').pipe(child.stdio[3], {end:false});

Вижте подробности тук от документацията на NodeJs: https://nodejs.org/dist/latest-v5.x/docs/api/stream.html#stream_readable_pipe_destination_options

Надявам се това да помогне на някой друг, който се сблъсква с този проблем.

person Ashish    schedule 17.02.2016
comment
Благодаря ви за този принос. Ще го тествам по-късно и ако работи, ще маркирам отговора ви като приет! - person Tomasz Rakowski; 17.02.2016

Можете да направите това с fork()

Току-що реших това за себе си...fork() е версията на spawn от по-високо ниво и се препоръчва да се използва fork() вместо spawn() като цяло.

ако използвате опцията {silent:true}, stdio ще бъде прехвърлен към родителския процес

          const cp = require('child_process');

          const n = cp.fork(<path>, args, {
              cwd: path.resolve(__dirname),
              detached: true,
           });

          n.stdout.setEncoding('utf8');

          // here we can listen to the stream of data coming from the child process:
          n.stdout.on('data', (data) => {
            ee.emit('data',data);
          });

          //you can also listen to other events emitted by the child process
          n.on('error', function (err) {
            console.error(err.stack);
            ee.emit('error', err);
          });

          n.on('message', function (msg) {
            ee.emit('message', msg);
          });

          n.on('uncaughtException', function (err) {
            console.error(err.stack);
            ee.emit('error', err);
          });


          n.once('exit', function (err) {
             console.error(err.stack);
             ee.emit('exit', err);
          });
person Alexander Mills    schedule 13.01.2016