Преобразовать RXJava Single в Deferred сопрограммы?

У меня есть Single от RxJava, и я хочу продолжить работу с Deferred from Kotlin Coroutines. Как это сделать?

fun convert(data: rx.Single<String>): kotlinx.coroutines.Deferred<String> = ...

Мне была бы интересна какая-нибудь библиотека (если она есть?), а также сделать это самостоятельно... Пока что я сделал эту ручную реализацию самостоятельно:

private fun waitForRxJavaResult(resultSingle: Single<String>): String? {
    var resultReceived = false
    var result: String? = null

    resultSingle.subscribe({
        result = it
        resultReceived = true
    }, {
        resultReceived = true
        if (!(it is NoSuchElementException))
            it.printStackTrace()
    })
    while (!resultReceived)
        Thread.sleep(20)

    return result
}

person micgn    schedule 16.05.2020    source источник
comment
Возможно, вам будет проще преобразовать функцию, возвращающую Single, в suspend fun, используя расширение Single.await().   -  person Roman Elizarov    schedule 17.05.2020
comment
У меня нет Single.await(), доступного с RxJava версии 1...   -  person micgn    schedule 18.05.2020
comment
Вы можете использовать awaitSingle(), который доступен для любой реализации Publisher (включая Rx1).   -  person Roman Elizarov    schedule 18.05.2020
comment
Не очень получается с моим синглом. Код находится здесь github .com/micgn/imagecatalog/blob/master/src/main/java/de/mg/   -  person micgn    schedule 28.05.2020


Ответы (2)


Существует эта библиотека, которая интегрирует RxJava с Coroutines: https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-rx2

Однако в этой библиотеке нет функции для прямого преобразования одиночного числа в Deferred. Причина этого, вероятно, в том, что RxJava Single не привязан к области действия сопрограммы. Если вы хотите преобразовать его в Deferred, вам необходимо предоставить ему CoroutineScope.

Вероятно, вы могли бы реализовать это следующим образом:

fun <T> Single<T>.toDeferred(scope: CoroutineScope) = scope.async { await() }

Функция Single.await (используемая в блоке async) взята из библиотеки kotlinx-coroutines-rx2.

Вы можете вызвать функцию следующим образом:

coroutineScope {
    val mySingle = getSingle()
    val deferred = mySingle.toDeferred(this)
}
person marstran    schedule 16.05.2020
comment
Хорошо, спасибо. К сожалению, это только интеграция с RxJava2, и у меня есть версия 1... - person micgn; 18.05.2020

Просто превратите свой сингл в функцию приостановки:

suspend fun waitForRxJavaResult(): String?{
    return suspendCoroutine { cont ->
        try {
            val result = resultSingle.blockingGet()
            cont.resume(result)
        }catch (e: Exception){
            cont.resume(null)
        }
    }
}
person Haider Malik    schedule 24.01.2021