RxJava отличается выводом между Flowable и Observable с Window и Groupby

Я использую RxJava2 с кодом, который сводится к чему-то вроде этого:

val whitespaceRegex = Regex("\\s+")
val queryRegex = Regex("query=([^&]+)", RegexOption.IGNORE_CASE)
val dateTimeFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME

@JvmStatic
fun main(args: Array<String>) {
    val cnt = AtomicLong()
    val templateStr = "|date| /ignored/ query=|query|"
    val random = ThreadLocalRandom.current()
    var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC"))

    val generator = Flowable.generate<String> { emitter ->
        // normally these are read from a file, this is for the example 
        val next = cnt.incrementAndGet()
        if (next % 3000 == 0L) {
            curDate = curDate.plusDays(1)
        }
        if (next < 100000) {
            val curStr = templateStr
                    .replace("|date|", dateTimeFormatter.format(curDate))
                    .replace("|query|", random.nextInt(1, 1000).toString())
            emitter.onNext(curStr)
        } else {
            emitter.onComplete()
        }

    }
    val source = generator
            .map { line ->
                val cols = line.split(whitespaceRegex)
                val queryRaw = queryRegex.find(cols[2])?.groupValues?.get(1) ?: ""
                val query = URLDecoder.decode(queryRaw, Charsets.UTF_8.name()).toLowerCase().replace(whitespaceRegex, " ").trim()
                val date = dateTimeFormatter.parse(cols[0])
                Pair(LocalDate.from(date), query)
            }
            .share()

    source
            .window(source.map { it.first }.distinctUntilChanged())
            .flatMap { window ->
                window
                        .groupBy { pair -> pair }
                        .flatMap({ grouping ->
                            grouping
                                    .count()
                                    .map {
                                        Pair(grouping.key, it)
                                    }.toFlowable()
                        })
            }
            .subscribe({ println("Result: $it}") }, { it.printStackTrace() }, { println("Done") })
}

Когда я использую Observable.generate, он работает нормально, но с Flowable.generate вывода нет. Это подсчет того, сколько запросов произошло в данный день. День увеличивается последовательно, поэтому я формирую окно каждого дня, а затем подсчитываю запросы с помощью groupBy. Нужно ли мне делать это по-другому с Flowable?


person ryanad    schedule 21.08.2017    source источник
comment
Если у вас больше групп, чем уровень параллелизма flatMap, поток может остановиться. Если ваш поток на основе Flowable полностью пуст, это может быть из-за чего-то другого: например, из-за того, что запрос не вызывается в реализации подписчика. Если по-прежнему нет событий, вот где doOnNext(System.out::println) вступает в силу, несколько мест на самом деле, чтобы увидеть, где данные потеряны.   -  person akarnokd    schedule 21.08.2017
comment
Когда дело доходит до RxJava, я всегда применяю подсказки @akarnokd;)   -  person Rafal G.    schedule 22.08.2017


Ответы (1)


Как упоминал akarnokd, это произошло из-за того, что для flatMap значение maxConcurrency по умолчанию равно 128. Я обнаружил эту проблему, https://github.com/ReactiveX/RxJava/issues/5126, где более подробно описана причина. Это устраняет проблему:

        val cnt = AtomicLong()
        val templateStr = "|date| /ignored/ query=|query|"
        val random = ThreadLocalRandom.current()
        var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC"))

        val generator = Flowable.generate<String> { emitter ->
            val next = cnt.incrementAndGet()
            if (next % 3000 == 0L) {
                curDate = curDate.plusDays(1)
            }
            if (next < 1000000) {
                val curStr = templateStr
                        .replace("|date|", dateTimeFormatter.format(curDate))
                        .replace("|query|", random.nextInt(1, 1000).toString())
                emitter.onNext(curStr)
            } else {
                emitter.onComplete()
            }

        }
        val source = generator
                .map { line ->
                    val cols = line.split(whitespaceRegex)
                    val queryRaw = queryRegex.find(cols[2])?.groupValues?.get(1) ?: ""
                    val query = URLDecoder.decode(queryRaw, Charsets.UTF_8.name()).toLowerCase().replace(whitespaceRegex, " ").trim()
                    val date = dateTimeFormatter.parse(cols[0])
                    Pair(LocalDate.from(date), query)
                }
                .share()

        source
                .window(source.map { it.first }.distinctUntilChanged().doOnEach({println("Win: $it")}))
                .flatMap( { window ->
                    window
                            .groupBy { pair -> pair }
                            .flatMap({ grouping ->
                                grouping
                                        .count()
                                        .map {
                                            Pair(grouping.key, it)
                                        }.toFlowable()
                            // fix is here
                            }, Int.MAX_VALUE)
                // and here
                }, Int.MAX_VALUE)
                .subscribe({ println("Result: $it}") }, { it.printStackTrace() }, { println("Done") })
person ryanad    schedule 22.08.2017