Node.js: неправильные дополнительные байты были записаны при обратном давлении в доступном для записи потоке

Я сделал простую бинарную передачу с помощью Socket.io для передачи файла на сервер от клиента. Я думал, что это сработало, но я понял, что размер файла был другим. При ошибке writableStream.write я прикрепил обработчик события слива, чтобы ждать, пока его можно будет перезаписать, и продолжить запись, но каждый раз, когда происходит событие слива, размер файла увеличивается на время запуска события слива, каждый размер 10240 байт, который я устанавливается для каждой передачи фрагмента.

Прежде чем я напишу здесь код, мне нужно объяснить поток кода:

  1. Запрос клиента на загрузку файла
  2. Сервер создает пустой файл (создает доступный для записи поток) и разрешает передачу
  3. Клиент передает данные (чанк) до тех пор, пока он не закончится
  4. Сервер записывает фрагмент с доступным для записи потоком
  5. Клиент завершает передачу на всех отправленных
  6. Сервер закрывает доступный для записи поток.
  7. Сделанный!

Это код на стороне сервера:

var writeStream = null;
var fileSize = 0;
var wrote = 0;

socket.on('clientRequestFileTransfer', (fileInfo) => {
    console.log(`Client request file transfer: ${fileInfo.name}(${fileInfo.size})`);

    fileSize = fileInfo.size;
    wrote = 0;

    writeStream = fs.createWriteStream(__dirname + '/' + fileInfo.name);
    writeStream.on('close', () => {
        console.log('Write stream ended.');
    });

    console.log('File created.');
    socket.emit('serverGrantFileTransfer');
});

socket.on('clientSentChunk', (chunk) => {
    function write() {
        let writeDone = writeStream.write(chunk);

        if(!writeDone) {
            console.log('Back pressure!');
            return writeStream.once('drain', write);
        }
        else {
            wrote += chunk.length;
            console.log(`Wrote chunks: ${chunk.length} / ${wrote} / ${fileSize}`);
            socket.emit('serverRequestContinue');
        }
    }

    write();        
});
socket.on('clientFinishTransmission', () => {
    writeStream.end();
    console.log('Transmission complete!');
});

И это клиент (добавлен код для чтения бинарного файла):

var fileEl = document.getElementById('file');
fileEl.onchange = function() {
    var file = fileEl.files[0];
    if(!file) return;

    var socket = io('http://localhost:3000');

    socket.on('connect', function() {
        var fileReader = new FileReader();
        fileReader.onloadend = function() {
            var bin = fileReader.result;
            var chunkSize = 10240;
            var sent = 0;

            // make server knows the name and size of the file
            socket.once('serverGrantFileTransfer', () => {
                function beginTransfer() {
                    if(sent >= bin.byteLength) {
                        console.log('Transmission complete!');
                        socket.emit('clientFinishTransmission');
                        return;
                    }

                    var chunk = bin.slice(sent, sent + chunkSize);

                    socket.once('serverRequestContinue', beginTransfer);
                    socket.emit('clientSentChunk', chunk);

                    sent += chunk.byteLength;
                    console.log('Sent: ' + sent);
                }

                beginTransfer();
            });
            socket.emit('clientRequestFileTransfer', {
                name: file.name,
                size: file.size
            });

        };

        fileReader.readAsArrayBuffer(file);
    });
};

Я протестировал этот код с файлом размером 4 162 611 байт, и у него был 1 сбой записи (1 обратное давление). После загрузки я проверил размер созданного файла, и он составил 4 172 851 байт, что на 10240 байт больше исходного, и это размер куска (10240).

Иногда запись терпит неудачу 2 раза, чем размер на 20480 байт больше, чем исходный, что вдвое превышает размер отправленного мной чанка.

Я дважды проверил свой код обратного давления, но мне кажется, что все в порядке. Я использую Node v6.2.2 и Socket.io v1.6.0, протестированные в браузере Chrome. Есть ли что-то, что я пропустил? Или я неправильно понял обратное давление? Любой совет будет очень признателен.

ОБНОВЛЕНИЕ

Похоже, когда происходит обратное давление, он дважды записывает одни и те же данные (как я сказал в комментарии). Поэтому я изменил код следующим образом:

socket.on('clientSentChunk', (chunk) => {
    function write() {
        var writeDone = writeStream.write(chunk);
        wrote += chunk.length;

        if(!writeDone) {
            console.log('**************** Back pressure ****************');
            // writeStream.once('drain', write);
            // no rewrite, just continue transmission
            writeStream.once('drain', () => socket.emit('serverRequestContinue'));
        }
        else {
            console.log(`Wrote chunks: ${chunk.length} / ${wrote} / ${fileSize}`);
            socket.emit('serverRequestContinue');
        }
    }

    write();        
});

Это сработало. Я довольно запутан, потому что, когда записываемый поток не может записать, он не записывает данные в поток, но на самом деле нет. Кто-нибудь знает об этом?


person modernator    schedule 24.11.2016    source источник
comment
Я не уверен, но похоже, что когда возникает обратное давление, он записывает одни и те же данные дважды. Насколько я знаю, когда записываемый поток не может быть записан, он не будет записывать данные в поток. Меня неправильно поняли?   -  person modernator    schedule 24.11.2016


Ответы (1)


Я предполагаю, что проблема в том, как вы создаете объект bin в var bin = ...;. Не могли бы вы, пожалуйста, код здесь?

Обновить


Вот внутренний код:

var express = require('express');
var app     = express();
var server  = app.listen(80);
var io = require('socket.io');
var fs = require('fs');

io = io.listen(server);


app.use(express.static(__dirname + '/public'));

app.use(function(req, res, next) {
    //res.send({a:1})
    res.sendFile(__dirname + '/socket_test_index.html');
});




io.on('connection', function(client) {  
    console.log('Client connected...', client);

    client.on('join', function(data) {
        //console.log(data);
    });

    setInterval(()=>{
        client.emit('news', 'news from server');
    }, 10000)


});

io.of('/upload', function(client){
    console.log('upload')
    logic(client);
})



function logic(socket) {



    var writeStream = null;
    var fileSize = 0;
    var wrote = 0;

    socket.on('clientRequestFileTransfer', (fileInfo) => {
        console.log(`Client request file transfer: ${fileInfo.name}(${fileInfo.size})`);

        fileSize = fileInfo.size;
        wrote = 0;

        writeStream = fs.createWriteStream(__dirname + '/' + fileInfo.name);
        writeStream.on('close', () => {
            console.log('Write stream ended.');
        });

        console.log('File created.');
        socket.emit('serverGrantFileTransfer');
    });

    socket.on('clientSentChunk', (chunk) => {
        function write() {
            var writeDone = writeStream.write(chunk);

            if(!writeDone) {
                console.log('Back pressure!');
                return writeStream.once('drain', write);
            }
            else {
                wrote += chunk.length;
                console.log(`Wrote chunks: ${chunk.length} / ${wrote} / ${fileSize}`);
                socket.emit('serverRequestContinue');
            }
        }

        write();        
    });
    socket.on('clientFinishTransmission', () => {
        writeStream.end();
        console.log('Transmission complete!');
    });


    socket.emit('serverGrantFileTransfer', {});

}

Вот html-код:

<script src="/socket.io/socket.io.js"></script>
<script>
var socket = io('http://localhost');
socket.on('news', function (data) {
    console.log(data);
});


socket.on('connect', function(data) {
    socket.emit('join', 'Hello World from client');
});
</script>




<input type="file" id="file" />

<script>
var fileEl = document.getElementById('file');
fileEl.onchange = function() {
    var file = fileEl.files[0];
    if(!file) return;

    var socket = io('http://localhost/upload');

    socket.on('connect', function() {
        var fileReader = new FileReader();
        fileReader.onloadend = function() {
            var bin = fileReader.result;
            var chunkSize = 10240;
            var sent = 0;

            // make server knows the name and size of the file
            socket.once('serverGrantFileTransfer', () => {
                function beginTransfer() {
                    if(sent >= bin.byteLength) {
                        console.log('Transmission complete!');
                        socket.emit('clientFinishTransmission');
                        return;
                    }

                    var chunk = bin.slice(sent, sent + chunkSize);

                    socket.once('serverRequestContinue', beginTransfer);
                    socket.emit('clientSentChunk', chunk);

                    sent += chunk.byteLength;
                    console.log('Sent: ' + sent);
                }

                beginTransfer();
            });
            socket.emit('clientRequestFileTransfer', {
                name: file.name,
                size: file.size
            });

        };

        fileReader.readAsArrayBuffer(file);
    });
};

</script>

Скопируйте их в корневой каталог экспресс-проекта. Я тестировал с двумя изображениями и одним файлом .pdf. Всем им передаются одни и те же байты

person jiajianrong    schedule 24.11.2016
comment
Конечно, я только что добавил. - person modernator; 24.11.2016
comment
Странно, что ваш код очень хорошо работает на моем ноутбуке (Windows7, Node V4). Я вставлю свой тест выше. Пожалуйста, проверьте. - person jiajianrong; 24.11.2016
comment
Достаточно ли велики файлы, которые вы тестировали, чтобы возникло обратное давление? Кажется, ничем не отличается от моего кода. - person modernator; 24.11.2016
comment
Какой файл вы передавали? Я даже проверил с видеофайлом .wmv (26 246 026 байт) с именем китайского символа, и это удалось. - person jiajianrong; 24.11.2016
comment
Извините, я не знаю, что означает back pressure. - person jiajianrong; 24.11.2016
comment
Обратное давление, вкратце, означает, что внутренний буфер заполнен и не может записать в доступный для записи поток. Затем вы должны прекратить запись, пока поток не станет перезаписываемым. - person modernator; 24.11.2016
comment
Спасибо. Теперь мне ясно, что Backpressure не встречалось во всех моих тестах. попробую воссоздать. - person jiajianrong; 24.11.2016
comment
Хм.. Не знаю почему, но я изменил код, чтобы он не перезаписывался при обратном давлении, и это сработало. Я не знаю, почему это происходит. Я попробую этот пример с другой версией node.js. - person modernator; 24.11.2016