Натуральный ака. интеллектуальная пакетная обработка — это метод обработки потоков, который оптимизирует пропускную способность, не влияя на задержку. В примере с параллельной очередью потребитель имеет возможность атомарно удалить все элементы, наблюдаемые в какой-то момент, а затем обработать их как пакет. В идеале очередь должна быть ограничена, устанавливая верхний предел размера пакета и одновременно оказывая обратное давление на отправителя.
Это называется «естественной» пакетной обработкой, потому что нет навязанного размера партии: когда трафик низкий, он будет обрабатывать каждый элемент, как только он поступит. В этом случае вам не нужна оптимизация пропускной способности путем группирования элементов. Когда трафик становится выше, потребитель автоматически начинает обрабатывать большие пакеты, амортизируя фиксированную задержку одной операции, такой как база данных INSERT
.
Я написал некоторый код, который достигает основной цели:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
const val batchLimit = 20
@ObsoleteCoroutinesApi
suspend inline fun <T: Any> ReceiveChannel<T>.consumeBatched(
handleItems: (List<T>) -> Unit
) {
val buf = mutableListOf<T>()
while (true) {
receiveOrNull()?.also { buf.add(it) } ?: break
for (x in 2..batchLimit) {
poll()?.also { buf.add(it) } ?: break
}
handleItems(buf)
buf.clear()
}
}
Мы можем проверить это с помощью этого:
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
val chan = generateMockTraffic()
runBlocking {
chan.consumeBatched { println("Received items: $it") }
}
}
@ExperimentalCoroutinesApi
private fun generateMockTraffic(): ReceiveChannel<Int> {
return GlobalScope.produce(capacity = batchLimit) {
(1..100).forEach {
send(it)
if (it % 10 == 0) {
delay(1)
}
}
}
}
consumeBatched()
опрашивает очередь по одному элементу за раз и, следовательно, должен дополнительно налагать пакетное ограничение. Было бы более оптимально, если бы он был написан против параллельной очереди, такой как OneToOneConcurrentArrayQueue, который поддерживает операцию drain
.
Есть ли лучший подход к каналам Kotlin с большей поддержкой библиотеки?
Если нет, будет ли это рассматриваться как добавление функции?