Свързване на входен поток към изходен поток

актуализация в java9: https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-

Видях подобни теми, но не точно от това, което ми трябва.

Имам сървър, който основно ще приема информация от клиент, клиент A, и ще я препраща, байт по байт, към друг клиент, клиент B.

Бих искал да свържа моя входен поток на клиент A с моя изходен поток на клиент B. Възможно ли е това? Какви са начините да направите това?

Освен това тези клиенти си изпращат съобщения, които са донякъде чувствителни към времето, така че буферирането няма да свърши работа. Не искам буфер от да речем 500 и клиентът изпраща 499 байта и след това сървърът ми задържа препращането на 500 байта, защото не е получил последния байт за запълване на буфера.

В момента анализирам всяко съобщение, за да намеря дължината му, след това чета байтове с дължина и след това ги препращам. Реших (и тествах), че това би било по-добре от четене на байт и препращане на байт отново и отново, защото това би било много бавно. Също така не исках да използвам буфер или таймер поради причината, която посочих в последния си абзац — не искам съобщенията, чакащи много дълго време, да преминат, просто защото буферът не е пълен.

Какъв е добър начин да направите това?


person jbu    schedule 15.10.2009    source източник


Отговори (10)


Това, че използвате буфер, не означава, че потокът трябва да запълни този буфер. С други думи, това трябва да е наред:

public static void copyStream(InputStream input, OutputStream output)
    throws IOException
{
    byte[] buffer = new byte[1024]; // Adjust if you want
    int bytesRead;
    while ((bytesRead = input.read(buffer)) != -1)
    {
        output.write(buffer, 0, bytesRead);
    }
}

Това трябва да работи добре - по принцип read повикването ще блокира, докато има някои налични данни, но няма да изчака, докато всички са налични, за да запълни буфера. (Предполагам, че може и вярвам, че FileInputStream обикновено ще запълни буфера, но потокът, прикачен към сокет, е по-вероятно да ви даде данните веднага.)

Мисля, че си струва първо да опитате това просто решение.

person Jon Skeet    schedule 15.10.2009
comment
Да, мисля, че това изяснява нещата. Мисля, че се обърквах с readFully(), което изисква буферът да се запълни. - person jbu; 16.10.2009
comment
Опитах вашия код и също се опитах да прочета съобщение по съобщение, като прочетох дължината на съобщението, след което направих byte[] buf = length; inputstream.read(buf)....последният метод беше по-бърз и не съм сигурен защо. Изглежда, че изпълнява повече редове код, но е по-бързо. Почти 2 пъти по-бързо. - person jbu; 16.10.2009
comment
Освен това, тъй като сървърът ми има множество източници и един приемник, мисля, че трябва да чета съобщения в множеството източници, защото простото четене и препращане на буфери може да преплита съобщения между клиентите и да ги кодира. - person jbu; 16.10.2009
comment
@Jon Skeet Има ли някакъв начин да използвате това, без да знаете точния размер на масива от байтове, от който ще се нуждаете? - person Zibbobz; 29.10.2013
comment
@Zibbobz: Всеки размер на масива ще работи - колкото по-голям е, толкова по-малко четения ще са необходими, но толкова повече памет отнема, докато работи. Не е като да е действителната дължина на потока. - person Jon Skeet; 29.10.2013
comment
@Jon Skeet Това изглежда изхвърля NullPointerException, когато го опитам. Знаете ли дали е особено неподходящ за работа, да речем, с PDF файлове? Защото случайно знам, че това е, с което си имаме работа. - person Zibbobz; 29.10.2013
comment
@Zibbobz: Не, звучи сякаш input или output е нула. Трябва да можете да видите това лесно чрез отстраняване на грешки. - person Jon Skeet; 29.10.2013
comment
Не забравяйте flush() (и close()) на вашите потоци. - person sgibly; 14.11.2014
comment
@sgibly: това би било отговорност на обаждащия се. - person Jon Skeet; 14.11.2014
comment
@JonSkeet вярно за извикванията close(), но определено бих изчистил() изходния поток тук. Твърде често се забравя да се направи това. - person sgibly; 14.11.2014
comment
@sgibly: Добре, като се има предвид, че close() ще го изчисти така или иначе, лично аз не мисля, че си струва. Разбира се, ако вземете код като този, трябва да се чувствате много свободни да го добавите :) - person Jon Skeet; 14.11.2014
comment
@JonSkeet Прав си :-) Току-що се изгорих преди, като не използвах флъш, и виждам защо това може да се случи - stackoverflow.com/questions/2732260/ - person sgibly; 14.11.2014
comment
Между другото, договорът close() в API не казва нищо за промиване преди затваряне. Вижте docs.oracle.com/javase/8/docs /api/java/io/OutputStream.html, така че ако се случи нещо подобно, то е много специфично за изпълнението на изходния поток. - person sgibly; 14.11.2014
comment
@sgibly: Бих казал, че е лошо документиран, а не намерението е всеки да извика флъш... - person Jon Skeet; 15.11.2014

Какво ще кажете просто да използвате

void feedInputToOutput(InputStream in, OutputStream out) {
   IOUtils.copy(in, out);
}

и да приключим с това?

от jakarta apache commons i/o библиотека, която вече се използва от огромно количество проекти, така че вероятно вече имате буркана във вашия classpath.

person Dean Hiller    schedule 28.02.2012
comment
или просто използвайте самата функция, тъй като не е необходимо извикване на друга функция с точно същите параметри.... - person android developer; 03.02.2014
comment
да, аз лично това правя. Предполагам, че съм написал само името на допълнителния метод като документация, но не е необходимо. - person Dean Hiller; 04.02.2014
comment
От това, което мога да кажа, този метод блокира, докато не бъде прекаран входът while. Следователно това трябва да се направи в асинхронна нишка за задаващия въпроса. - person Jonah; 18.09.2015

За пълнота guava също има удобна помощна програма за това

ByteStreams.copy(input, output);
person JesusFreke    schedule 08.09.2013

JDK 9 добави InputStream#transferTo(OutputStream out) за тази функционалност.

person Warren Nocos    schedule 10.03.2017

Можете да използвате кръгъл буфер:

Код

// buffer all data in a circular buffer of infinite size
CircularByteBuffer cbb = new CircularByteBuffer(CircularByteBuffer.INFINITE_SIZE);
class1.putDataOnOutputStream(cbb.getOutputStream());
class2.processDataFromInputStream(cbb.getInputStream());


Зависимост от Maven

<dependency>
    <groupId>org.ostermiller</groupId>
    <artifactId>utils</artifactId>
    <version>1.07.00</version>
</dependency>


Подробности за режима

http://ostermiller.org/utils/CircularBuffer.html

person Stephan    schedule 25.11.2011

Асинхронен начин за постигане.

void inputStreamToOutputStream(final InputStream inputStream, final OutputStream out) {
    Thread t = new Thread(new Runnable() {

        public void run() {
            try {
                int d;
                while ((d = inputStream.read()) != -1) {
                    out.write(d);
                }
            } catch (IOException ex) {
                //TODO make a callback on exception.
            }
        }
    });
    t.setDaemon(true);
    t.start();
}
person Daniel De León    schedule 28.06.2012
comment
Това е за прехвърляне на данни от един поток към друг, без да блокира текущата ви нишка. - person Daniel De León; 05.03.2015

BUFFER_SIZE е размерът на патронниците за четене. Трябва да бъде > 1 kb и ‹ 10 MB.

private static final int BUFFER_SIZE = 2 * 1024 * 1024;
private void copy(InputStream input, OutputStream output) throws IOException {
    try {
        byte[] buffer = new byte[BUFFER_SIZE];
        int bytesRead = input.read(buffer);
        while (bytesRead != -1) {
            output.write(buffer, 0, bytesRead);
            bytesRead = input.read(buffer);
        }
    //If needed, close streams.
    } finally {
        input.close();
        output.close();
    }
}
person Leandro Latorre    schedule 28.04.2015
comment
Трябва да е много по-малко от 10 MB. Това е TCP, за който говорим. Всеки размер, по-голям от буфера за получаване на сокета, е напълно безсмислен и те се измерват в килобайти, а не в мегабайти. - person user207421; 15.05.2015

Използвайте org.apache.commons.io.IOUtils

InputStream inStream = new ...
OutputStream outStream = new ...
IOUtils.copy(inStream, outStream);

или copyLarge за размер >2 GB

person Adam111p    schedule 20.10.2017

Това е версия на Scala, която е чиста и бърза (без stackoverflow):

  import scala.annotation.tailrec
  import java.io._

  implicit class InputStreamOps(in: InputStream) {
    def >(out: OutputStream): Unit = pipeTo(out)

    def pipeTo(out: OutputStream, bufferSize: Int = 1<<10): Unit = pipeTo(out, Array.ofDim[Byte](bufferSize))

    @tailrec final def pipeTo(out: OutputStream, buffer: Array[Byte]): Unit = in.read(buffer) match {
      case n if n > 0 =>
        out.write(buffer, 0, n)
        pipeTo(out, buffer)
      case _ =>
        in.close()
        out.close()
    }
  }

Това позволява да се използва символ >, напр. inputstream > outputstream и също така предава персонализирани буфери/размери.

person pathikrit    schedule 17.09.2015
comment
Бихте ли предоставили подобна реализация на Java? - person Luchostein; 04.05.2016
comment
@Luchostein: Отговарях на бъгия отговор на Scala от Джордж Плигор по-долу - person pathikrit; 04.05.2016

В случай, че се интересувате от функционалност, това е функция, написана на Scala, показваща как можете да копирате входен поток в изходен поток, като използвате само vals (а не vars).

def copyInputToOutputFunctional(inputStream: InputStream, outputStream: OutputStream,bufferSize: Int) {
  val buffer = new Array[Byte](bufferSize);
  def recurse() {
    val len = inputStream.read(buffer);
    if (len > 0) {
      outputStream.write(buffer.take(len));
      recurse();
    }
  }
  recurse();
}

Имайте предвид, че това не се препоръчва да се използва в Java приложение с малко налична памет, защото с рекурсивна функция можете лесно да получите грешка при препълване на стека

person George Pligoropoulos    schedule 20.03.2013
comment
-1: Как едно рекурсивно решение на Scala е подходящо за въпрос на Java? - person tomlogic; 22.08.2013
comment
Рекурсивният метод е рекурсивен с опашка. Ако го анотирате с @tailrec, няма да имате проблеми с препълването на стека. - person Simão Martins; 17.09.2015
comment
Този отговор потвърждава, че всички програмисти на чиста Java страдат от натиска на своите шефове и се нуждаят от сериозно управление на гнева! - person George Pligoropoulos; 09.03.2016