Kotlin Flow выполняет два вызова API параллельно и собирает каждый результат по мере его поступления.

Я пытаюсь реализовать кеш-стратегию, а затем сетевую стратегию для моего вызова API, используя Kotlin Flows. Вот что я сейчас пытаюсь

flowOf(
 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
   .catch { error -> Timber.e(error) },
 remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
).flattenConcat().collect {
  Timber.i("Response Received")
}

Проблема здесь в том, что collect вызывается только при возврате getDataFromServer. Я ожидаю, что через несколько миллисекунд я получу первое событие из кеша, а затем второе событие с сервера. В этом случае "Response Received"gets печатается дважды, но сразу один за другим.

В этом другом варианте "Response Received" печатается только один раз, то есть после возврата getDataFromServer().

 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
  .catch { error -> Timber.e(error) }
  .flatMapConcat {
    remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
  }
  .collect {
    Timber.i("Response Received")
  }

Раньше я использовал Flowable.concat() RxJava, и он работал отлично. Есть ли в Kotlin Flows что-то, что может имитировать такое поведение?


person Abhishek Bansal    schedule 12.02.2020    source источник


Ответы (3)


Недавно в версию 1.3.3 сопрограмм Kotlin был добавлен оператор merge. Вот объединенный PR.

Используя оператор слияния, вы сможете получить результат по мере его поступления.

person ragdroid    schedule 13.02.2020

Проблема здесь в том, что collect вызывается только при возврате getDataFromServer.

Первая проблема с вашим дизайном заключается в том, что функция возврата потока также может быть приостановлена. Это два уровня подвешиваемости. Функции должны возвращать потоки без каких-либо задержек, а сами потоки должны выдавать элементы по мере их поступления. Если вы следовали этому руководству, ваш исходный код уже работал бы.

Как вы написали эти функции, они все еще могут работать, если вы напишете это:

flow<String> {
    emitAll(getCached())
    emitAll(getFromServer())
}

Эта инструкция завершается немедленно, возвращая холодный поток. Когда вы вызываете collect, он сначала вызывает getCached() и выдает кэшированное значение, а затем вызывает getFromServer() и выдает ответ сервера.


Вышеупомянутое решение запускает вызов сервера только после того, как вы используете кешированное значение. Если вам нужно, чтобы два потока были активны одновременно, используйте flatMapMerge.

Предполагая, что вы устранили указанную выше основную проблему и сделали свои функции, возвращающие поток, не приостанавливаемыми, все, что вам нужно, это следующее:

flowOf(getCached(), getFromServer()).flattenMerge()

Если по какой-то причине вы не можете этого сделать, вам нужно добавить оболочку emitAll вокруг каждого вызова:

flowOf(
    flow { emitAll(getCached()) }, 
    flow { emitAll(getFromServer()) }
).flattenMerge()
person Marko Topolnik    schedule 13.02.2020

Оказывается, flowOf(someOperation()) someOperation() необходимо завершить, чтобы нисходящий поток начал обработку. Это похоже на Observable.just(someOperation()) в RxJava мире.

Во втором сценарии flatMapConcat на самом деле является оператором transform, поэтому он, очевидно, возвращает окончательный обработанный результат.

Похоже, что в мире Flow отсутствуют родные concat подобные операторы. Вот как я решил эту проблему в итоге

flow {
   remoteDataSource.getDataFromCache()
   .catch { error -> Timber.e(error) }
   .onCompletion {
       remoteDataSource.getDataFromServer()
            .collect {
                 emit(it)
            }
    }.collect { emit(it) }
}
person Abhishek Bansal    schedule 13.02.2020