Kotlin: Как дождаться выхода сопрограммы из режима ожидания без блокировки запуска?

Изменить 2: Мне кажется, я неправильно понял документацию. Я читаю:

runBlocking

Эту функцию нельзя использовать из сопрограммы. Он предназначен для соединения обычного блокирующего кода с библиотеками, написанными в стиле приостановки, для использования в main функциях и в тестах.

Это означает, что я не должен использовать runBlocking() вообще, кроме main или тестов. Но теперь я понимаю, что прочитал неправильно, особенно эту часть:

Он разработан, чтобы связать обычный блокирующий код с библиотеками, написанными в режиме приостановки

Похоже, что в этом сценарии необходимо использовать runBlocking .

Однако я думаю, что мне следует полностью изучить тему контекстов, чтобы увидеть, какие контексты лучше всего подходят для перехода к runBlocking в этом случае:

return runBlocking(???????) {
    job1.await() + job2.await()
}

Изменить: Ясно, что я сформулировал вопрос плохо, так как все попытки ответить на него упускают из виду фактический вопрос и ограничение, которое я задаю. Итак, давайте попробуем другой подход ...

Это работает:

fun doSomething(): Int {
    val job1 = GlobalScope.async { calculateSomething() }
    val job2 = GlobalScope.async { calculateSomething() }
    return runBlocking {
        job1.await() + job2.await()
    }
}

suspend fun calculateSomething(): Int {
    delay(1000L)
    return 13
}

suspend fun calculateSomethingElse(): Int {
    delay(2000L)
    return 19
}

У меня вопрос: смогу ли я добиться такого же результата?

  1. Без использования runBlocking().
  2. Без превращения doSomething() в suspend функцию.

?

Другими словами: есть ли что-нибудь, что я могу поставить вместо ??????, чтобы выполнить следующую работу?

fun doSomething(): Int {
    val job1 = GlobalScope.async { calculateSomething() }
    val job2 = GlobalScope.async { calculateSomething() }
    return ????????? {
        job1.?????() + job2.?????()
    }
}

suspend fun calculateSomething(): Int {
    delay(1000L)
    return 13
}

suspend fun calculateSomethingElse(): Int {
    delay(2000L)
    return 19
}

У меня есть небольшой служебный метод, который запускает любую заданную внешнюю команду и возвращает ее результат (то есть небольшую оболочку вокруг Java Process API):

class RunningCommand(private val proc: Process) {

    fun waitFor(): String {
        proc.outputStream.close()

        var output = ""
        val outputRdr = thread { output = proc.inputStream.bufferedReader().use { it.readText() } }
        var error = ""
        val errorRdr = thread { error = proc.errorStream.bufferedReader().use { it.readText() } }

        proc.waitFor()

        outputRdr.join()
        errorRdr.join()

        if (proc.exitValue() != 0) {
            throw RuntimeException("Command returned non-zero status: $output$error")
        }

        return output
    }
}

Этот код работает правильно. Однако он создает два дополнительных потока для каждого выполнения команды. Я хотел избежать этого, переключившись на сопрограммы. Я смог это сделать, но мне пришлось использовать runBlocking:

class RunningCommand(private val proc: Process) {

    fun waitFor(): String {
        proc.outputStream.close()

        var output = ""
        val outputRdr = GlobalScope.async { output = proc.inputStream.bufferedReader().use { it.readText() } }
        var error = ""
        val errorRdr = GlobalScope.async { error = proc.errorStream.bufferedReader().use { it.readText() } }

        proc.waitFor()

        runBlocking {
            outputRdr.await()
            errorRdr.await()
        }

        if (proc.exitValue() != 0) {
            throw RuntimeException("Command returned non-zero status: $output${error}")
        }

        return output
    }
}

Этот код также работает, но я читал, что runBlocking следует использовать только в main() методах и тестах, то есть не предназначено для использования таким образом. Если взглянуть на его реализацию, это выглядит ужасно и действительно похоже на то, что не хотелось бы вызывать повторно из какого-то служебного метода.

Итак, мой вопрос: как еще я должен преодолеть разрыв между блокирующим кодом и сопрограммами? Или, другими словами, как правильно ждать suspend функции из кода, отличного от suspend?

Или просто мой дизайн неправильный, и чтобы использовать сопрограммы где-нибудь в дальнейшем, мне нужно создать main() метод runBlocking и, по сути, всегда находиться внутри некоторой области сопрограмм?


person Amir Abiri    schedule 30.12.2019    source источник


Ответы (4)


Для будущих путешественников, которые совершают ту же ошибку, что и я - runBlocking можно использовать не только в main / тестах, но также:

Он разработан, чтобы связать обычный блокирующий код с библиотеками, написанными в стиле приостановки.

Почему-то у меня создалось впечатление, что использовать только для какой-то библиотечной функции - зло, но это не так.

person Amir Abiri    schedule 02.01.2020
comment
Также обратите внимание, что ваш исходный пример с двумя async() в глобальной области видимости страдает от проблемы, заключающейся в том, что сопрограммы не связаны между собой - если одна из них выходит из строя, другая может все еще работать, поскольку она полностью независима от всего и привязана только к глобальной области. - person denis.zhdanov; 21.02.2021

Вы можете создать свою собственную область действия с диспетчерами, которые выполняют операции в фоновом режиме. Вы можете использовать withContext, если хотите дождаться полного завершения выполнения чего-либо.

private val myScope = CoroutineScope(Dispatchers.Main) 

myScope.launch {

   withContext(Dispatchers.IO) {
        //to stuff in the background
    }

}

Запустив приведенный ниже код, вы увидите, что он печатает 20, а не null.

fun main() {
    callSuspendFun()
}

suspend fun doWorkAndReturnDouble(num: Int): Int {
    delay(1000)
    return num * 2
}

fun callSuspendFun() {
    val coroutineScope = CoroutineScope(Dispatchers.Main)
    coroutineScope.launch {
        var num: Int? = null
        withContext(Dispatchers.IO) {
            val newNum = 10
            num = doWorkAndReturnDouble(newNum)
        }
        println(num)
    }
}

Итак, чтобы вызвать функцию приостановки из функции, не являющейся приостановкой, без использования runBlocking, вам необходимо создать область сопрограммы. А withContext вы ждете выполнения кода.

person PhillauSofia    schedule 30.12.2019
comment
Я не уверен, что следую - можете ли вы изменить свой пример, чтобы показать, как вы ждете сопрограмму или приостанавливаете функцию из не приостановленной функции без использования runBlocking? - person Amir Abiri; 30.12.2019
comment
Я добавил пример, который вы можете запустить на себе. Надеюсь, это поможет больше. - person PhillauSofia; 30.12.2019
comment
Не думаю, что вы полностью поняли мой вопрос. Мне нужен код блокировки / приостановки, чтобы заблокировать и дождаться завершения сопрограммы и получить ее результат. Если я правильно понимаю, вы здесь продемонстрировали, как запустить сопрограмму из непостоянного кода в огне и забыть о моде ... - person Amir Abiri; 30.12.2019

Вы должны использовать coroutineScope

suspend fun waitFor(): String = coroutineScope {
    proc.outputStream.close()

    var output = ""
    val outputRdr = async { output = proc.inputStream.bufferedReader().use { it.readText() } }
    var error = ""
    val errorRdr = async { error = proc.errorStream.bufferedReader().use { it.readText() } }

    proc.waitFor()

        outputRdr.await()
        errorRdr.await()

    if (proc.exitValue() != 0) {
        throw RuntimeException("Command returned non-zero status: $output${error}")
    }

    return output
}
person Mohammad Sianaki    schedule 01.01.2020
comment
Опять же - это означает, что waitFor() теперь suspend метод. waitFor() вызывается из метода, отличного от suspend. Как заставить вызывающего абонента ждать результата waitFor()? - person Amir Abiri; 01.01.2020

Вы можете использовать CoroutineScope() с Dispathers.IO, который запустит сопрограмму в фоновом потоке и разгрузит ваше выполнение в этом потоке.

class RunningCommand(private val proc: Process) {

fun waitFor(): String {
    // Starting a new coroutine on Background thread (IO)
    proc.outputStream.close()
    var output = ""
    CoroutineScope(Dispatchers.Unconfined).async {

        val outputRdr = async { output = proc.inputStream.bufferedReader().use { it.readText() } }
        var error = ""
        val errorRdr = async { error = proc.errorStream.bufferedReader().use { it.readText() } }

        proc.waitFor()

        // No need of runBlocking 
        // await() call would wait for completion of its async block
        outputRdr.await() // Execution would block until outputRdr block completion on IO thread
        // At this stage, outputRdr block is completed
        errorRdr.await() // Execution would block until errorRdr block completion on IO thread
        // At this stage, errorRdr block is completed


        if (proc.exitValue() != 0) {
            throw RuntimeException("Command returned non-zero status: $output${error}")
        }
    return@async output
    }
    return output
}

}

Примечание. Если вы вызываете метод waitFor() из любого контекста сопрограммы, вы можете продолжить работу в том же контексте сопрограммы, написав coroutineScope { } вместо CoroutineScope(Dispatchers.IO).launch { }, правильно управляя структурированным параллелизмом.

person Kushal    schedule 01.01.2020
comment
Этот код не компилируется. waitFor() теперь ничего не возвращает. Кроме того, строки proc.outputStream.close() и proc.waitFor() теперь генерируют предупреждение о неприемлемом вызове метода блокировки. Также оператор return не принимается, так как блок запуска имеет тип CoroutineScope.() -> Unit. - person Amir Abiri; 01.01.2020
comment
Я исправил ошибки. proc.outputStream.close() предупреждал о блокировке вызова, поэтому я поставил его за пределы области сопрограммы. Я также обработал ошибку возвращаемого значения - person Kushal; 02.01.2020
comment
Если вы хотите получить правильный output, возвращенный из waitFor(), вам нужно приостановить эту функцию. В настоящее время значение output не будет соответствовать правильному значению, так как строка return output не ожидает завершения запущенной сопрограммы - person Kushal; 02.01.2020
comment
@AmirAbiri: Я отредактировал свой код и для этого случая. Теперь будет возвращено output с правильным значением. Кроме того, waitFor() - это функция без приостановки. Пожалуйста, запустите этот код. - person Kushal; 02.01.2020
comment
Это тоже не работает - waitFor() просто немедленно возвращает "". - person Amir Abiri; 02.01.2020
comment
Тогда есть проблема с записью значения в output. Можете ли вы перепроверить, отладить или зарегистрировать? - person Kushal; 02.01.2020