Я использую RxJava2 с кодом, который сводится к чему-то вроде этого:
val whitespaceRegex = Regex("\\s+")
val queryRegex = Regex("query=([^&]+)", RegexOption.IGNORE_CASE)
val dateTimeFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME
@JvmStatic
fun main(args: Array<String>) {
val cnt = AtomicLong()
val templateStr = "|date| /ignored/ query=|query|"
val random = ThreadLocalRandom.current()
var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC"))
val generator = Flowable.generate<String> { emitter ->
// normally these are read from a file, this is for the example
val next = cnt.incrementAndGet()
if (next % 3000 == 0L) {
curDate = curDate.plusDays(1)
}
if (next < 100000) {
val curStr = templateStr
.replace("|date|", dateTimeFormatter.format(curDate))
.replace("|query|", random.nextInt(1, 1000).toString())
emitter.onNext(curStr)
} else {
emitter.onComplete()
}
}
val source = generator
.map { line ->
val cols = line.split(whitespaceRegex)
val queryRaw = queryRegex.find(cols[2])?.groupValues?.get(1) ?: ""
val query = URLDecoder.decode(queryRaw, Charsets.UTF_8.name()).toLowerCase().replace(whitespaceRegex, " ").trim()
val date = dateTimeFormatter.parse(cols[0])
Pair(LocalDate.from(date), query)
}
.share()
source
.window(source.map { it.first }.distinctUntilChanged())
.flatMap { window ->
window
.groupBy { pair -> pair }
.flatMap({ grouping ->
grouping
.count()
.map {
Pair(grouping.key, it)
}.toFlowable()
})
}
.subscribe({ println("Result: $it}") }, { it.printStackTrace() }, { println("Done") })
}
Когда я использую Observable.generate
, он работает нормально, но с Flowable.generate
вывода нет. Это подсчет того, сколько запросов произошло в данный день. День увеличивается последовательно, поэтому я формирую окно каждого дня, а затем подсчитываю запросы с помощью groupBy. Нужно ли мне делать это по-другому с Flowable?
Flowable
полностью пуст, это может быть из-за чего-то другого: например, из-за того, что запрос не вызывается в реализации подписчика. Если по-прежнему нет событий, вот гдеdoOnNext(System.out::println)
вступает в силу, несколько мест на самом деле, чтобы увидеть, где данные потеряны. - person akarnokd   schedule 21.08.2017