Как реализовать естественную (иначе умную) пакетную обработку с каналами Kotlin?

Натуральный ака. интеллектуальная пакетная обработка — это метод обработки потоков, который оптимизирует пропускную способность, не влияя на задержку. В примере с параллельной очередью потребитель имеет возможность атомарно удалить все элементы, наблюдаемые в какой-то момент, а затем обработать их как пакет. В идеале очередь должна быть ограничена, устанавливая верхний предел размера пакета и одновременно оказывая обратное давление на отправителя.

Это называется «естественной» пакетной обработкой, потому что нет навязанного размера партии: когда трафик низкий, он будет обрабатывать каждый элемент, как только он поступит. В этом случае вам не нужна оптимизация пропускной способности путем группирования элементов. Когда трафик становится выше, потребитель автоматически начинает обрабатывать большие пакеты, амортизируя фиксированную задержку одной операции, такой как база данных INSERT.

Я написал некоторый код, который достигает основной цели:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

const val batchLimit = 20

@ObsoleteCoroutinesApi
suspend inline fun <T: Any> ReceiveChannel<T>.consumeBatched(
        handleItems: (List<T>) -> Unit
) {
    val buf = mutableListOf<T>()
    while (true) {
        receiveOrNull()?.also { buf.add(it) } ?: break
        for (x in 2..batchLimit) {
            poll()?.also { buf.add(it) } ?: break
        }
        handleItems(buf)
        buf.clear()
    }
}

Мы можем проверить это с помощью этого:

@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
    val chan = generateMockTraffic()
    runBlocking {
        chan.consumeBatched { println("Received items: $it") }
    }
}

@ExperimentalCoroutinesApi
private fun generateMockTraffic(): ReceiveChannel<Int> {
    return GlobalScope.produce(capacity = batchLimit) {
        (1..100).forEach {
            send(it)
            if (it % 10 == 0) {
                delay(1)
            }
        }
    }
}

consumeBatched() опрашивает очередь по одному элементу за раз и, следовательно, должен дополнительно налагать пакетное ограничение. Было бы более оптимально, если бы он был написан против параллельной очереди, такой как OneToOneConcurrentArrayQueue, который поддерживает операцию drain.

Есть ли лучший подход к каналам Kotlin с большей поддержкой библиотеки?

Если нет, будет ли это рассматриваться как добавление функции?


person Marko Topolnik    schedule 14.12.2018    source источник


Ответы (1)


Есть ли лучший подход к каналам Kotlin с большей поддержкой библиотеки?

Библиотека не поддерживает эту функцию.

Если нет, будет ли это рассматриваться как добавление функции?

Это зависит от желаемой поверхности API. Член drain вряд ли подходит для семантики канала: он ограничивает реализацию, он должен каким-то образом выставлять предел стока и дает каналу более «коллекционно-подобный» API. Например. как должен вести себя drain с безлимитным каналом? Можно ли эффективно реализовать drain (с предварительно заданным размером буфера, но избегая OOM и неограниченных коллекций) один раз и использовать его с любой реализацией канала?

Что можно было бы улучшить, так это дополнительные подсказки от канала, такие как ожидаемая емкость и количество элементов в очереди. Они могут иметь нестрогую семантику с реализацией по умолчанию и действовать как подсказки к расширению drain с некоторыми разумными настраиваемыми верхними границами. Такой API может быть добавлен в будущем, не стесняйтесь создать задачу

person qwwdfsad    schedule 18.12.2018