Как можно непрерывно материализовать потоки Akka?

Я использую Akka Streams в Scala для опроса из очереди AWS SQS с помощью AWS Java SDK. Я создал ActorPublisher, который удаляет сообщения из очереди через двухсекундный интервал:

class SQSSubscriber(name: String) extends ActorPublisher[Message] {
  implicit val materializer = ActorMaterializer()

  val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")

  val client = new AmazonSQSClient()
  client.setRegion(RegionUtils.getRegion("us-east-1"))
  val url = client.getQueueUrl(name).getQueueUrl

  val MaxBufferSize = 100
  var buf = Vector.empty[Message]

  override def receive: Receive = {
    case "dequeue" =>
      val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
      messages.foreach(self ! _)
    case message: Message if buf.size == MaxBufferSize =>
      log.error("The buffer is full")
    case message: Message =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(message)
      else {
        buf :+= message
        deliverBuf()
      }
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }

  @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

В моем приложении я также пытаюсь запустить поток с интервалом в 2 секунды:

val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
  .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
  .to(Sink.ignore)

system.scheduler.schedule(0 seconds, 2 seconds) {
  flow.runWith(sqsSource)(ActorMaterializer()(system))
}

Однако, когда я запускаю свое приложение, я получаю java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds] и последующие уведомления о недоставке, что вызвано ActorMaterializer.

Есть ли рекомендуемый подход для постоянной материализации потока Akka?


person David Caseria    schedule 12.09.2015    source источник
comment
Я не могу сейчас это протестировать, но я не уверен, что могу использовать более одного экземпляра ActorMaterializer. Вы используете один экземпляр внутри ActorPublisher, а другой - для всего потока.   -  person Mariusz Nosiński    schedule 14.09.2015
comment
В итоге я использовал Akka-Camel, поскольку у него хорошая интеграция с SQS, которая выполнила все, что мне нужно было (github.com/fzakaria/Akka-Camel-SQS).   -  person David Caseria    schedule 22.09.2015
comment
Есть ли причина, по которой вам нужно постоянно использовать другого ActorPublisher каждые 2 секунды? Основываясь на приведенном примере кода, было бы намного проще использовать одного и того же издателя ...   -  person Ramón J Romero y Vigil    schedule 23.11.2015


Ответы (1)


Я не думаю, что вам нужно создавать новый ActorPublisher каждые 2 секунды. Это кажется излишним и расточительным. Кроме того, я не думаю, что ActorPublisher необходим. Судя по тому, что я могу сказать о коде, в вашей реализации будет постоянно расти количество потоков, запрашивающих одни и те же данные. Каждый Message от клиента будет обрабатываться N разными потоками акка и, что еще хуже, N со временем будет расти.

Итератор для запросов с бесконечным циклом

Вы можете добиться того же поведения от вашего ActorPublisher, используя scala Iterator. Можно создать итератор, который непрерывно опрашивает клиента:

//setup the client
val client = {
  val sqsClient = new AmazonSQSClient()
  sqsClient setRegion (RegionUtils getRegion "us-east-1")
  sqsClient
}

val url = client.getQueueUrl(name).getQueueUrl

//single query
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable {
  client receiveMessage (new ReceiveMessageRequest(url).getMessages)
}

def messageListIteartor : Iterator[Iterable[Message]] = 
  Iterator continually messageListStream

//messages one-at-a-time "on demand", no timer pushing you around
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity

Эта реализация запрашивает клиента только тогда, когда все предыдущие Сообщения были использованы, и, следовательно, является действительно реактивным. Нет необходимости отслеживать буфер фиксированного размера. Вашему решению нужен буфер, потому что создание сообщений (через таймер) не связано с потреблением сообщений (через println). В моей реализации создание и потребление тесно связаны через противодавление.

Источник потока Akka

Затем вы можете использовать эту функцию-генератор Iterator для подачи потока akka Source:

def messageSource : Source[Message, _] = Source fromIterator messageIterator

Формирование потока

И, наконец, вы можете использовать этот Источник для выполнения println (в качестве примечания: ваше значение flow фактически равно Sink с Flow + Sink = Sink). Используя значение flow из вопроса:

messageSource runWith flow

Один акка Stream обрабатывает все сообщения.

person Ramón J Romero y Vigil    schedule 23.11.2015