Отправка метаданных вместе с потоком Akka

Вот мой предыдущий вопрос: Отправлять данные из InputStream через поток Akka/Spring< /а>

Мне удалось отправить сжатый и зашифрованный файл через поток Akka. Теперь я ищу способ транспортировки метаданных вместе с данными, в основном с именем файла и хэшем (контрольной суммой).

Моя текущая идея состоит в том, чтобы использовать функцию Flow.prepend и вставлять метаданные перед данными следующим образом:

  1. имя файла, которое может различаться по размеру, но всегда заканчивается нулевым байтом

  2. хэш фиксированного размера (контрольная сумма)

  3. данные

Затем, на приемной стороне, мне пришлось бы дважды использовать Flow.takeWhile - один раз для чтения имени файла и второй раз для чтения хэша, а затем просто читать данные. Это не очень похоже на элегантное решение, плюс, если в будущем я захочу добавить больше метаданных, это станет еще хуже.

Я заметил метод Flow.named, однако в документации сказано только:

Add a ``name`` attribute to this Flow.

и я не знаю, как это использовать (и можно ли через него передать имя файла).

Вопрос: есть ли лучшая идея для передачи метаданных вместе с данными через поток Akka, чем указано выше?

РЕДАКТИРОВАТЬ: Прикрепление моего рисунка с идеей. .com/R7g49.png" alt="идея">


person spam    schedule 25.02.2017    source источник


Ответы (1)


Я думаю, что добавление метаданных имеет смысл. Простым подходом может быть добавление метаданных в начале с использованием того же кадра, который вы используете для отправки данных.

Принимающая сторона должна знать, сколько блоков метаданных существует, и использовать эту информацию для ее разделения. См. пример ниже.

// client end
filenameSrc
  .concat(hashSrc)
  .concat(dataSrc)
  .via(Framing.delimiter(ByteString("\n"), Int.MaxValue, allowTruncation = true))
  .via(Tcp().outgoingConnection(???, ???))
  .runForeach{ ??? }

// server end
val printMetadata =
  Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val metadataSink = Sink.foreach(println)
    val bcast = builder.add(Broadcast[ByteString](2))

    bcast.out(0).take(2) ~> metadataSink

    FlowShape(bcast.in, bcast.out(1).drop(2).outlet)
  })

val handler =
  Framing.delimiter(ByteString("\n"), Int.MaxValue)
  .via(printMetadata)
  .via(???)

Это только один из многих возможных подходов к решению этой проблемы. Но какое бы решение вы ни выбрали, получатель должен знать, как извлечь метаданные из необработанного потока байтов, который он считывает по протоколу TCP.

person Stefano Bonetti    schedule 25.02.2017
comment
Я предполагаю, что невозможно добавить поток ByteString к моему собственному объекту, который будет содержать метаданные? - person spam; 26.02.2017
comment
хорошо, если вы собираетесь транслировать через Tcp, на принимающей стороне вы всегда обнаружите поток необработанных байтов (т.е. ByteString, если вы используете Akka Streams) - person Stefano Bonetti; 26.02.2017
comment
Хммм... как насчет этого: я храню свои метаданные в собственном классе/объекте, затем сериализую их в json (в строку), затем кодирую с использованием base64 (чтобы не было нулевых байтов) и добавляю данные с этим base64 плюс null как разделитель? - person spam; 26.02.2017
comment
@spam обратите внимание, что Framing.delimiter также поддерживает поле длины, поэтому вы можете добавлять длину перед каждым фрагментом данных. Я лично предпочел бы этот подход, так как он несколько более надежен. - person Vladimir Matveev; 26.02.2017
comment
Но я не хочу использовать Framing.delimiter для всего потока — я просто хочу брать метаданные с самого начала. - person spam; 26.02.2017
comment
Пожалуйста, взгляните на мой рисунок идеи, который я прикрепил к вопросу. - person spam; 26.02.2017