У меня есть пакетное задание в Scala/Spark, которое динамически создает правила Drools в зависимости от некоторого ввода, а затем оценивает правила. У меня также есть входные данные RDD[T]
, которые соответствуют фактам, которые нужно вставить в механизм правил.
Пока я вставляю факты один за другим, а затем запускаю все правила по этому факту. Я делаю это, используя rdd.aggregate
.
Оператор seqOp определяется следующим образом:
/**
* @param broadcastRules the broadcasted KieBase object containing all rules
* @param aggregator used to accumulate values when rule matches
* @param item the fact to run Drools with
* @tparam T the type of the given item
* @return the updated aggregator
*/
def seqOp[T: ClassTag](broadcastRules: Broadcast[KieBase])(
aggregator: MyAggregator,
item: T) : MyAggregator = {
val session = broadcastRules.value.newStatelessKieSession
session.setGlobal("aggregator", aggregator)
session.execute(CommandFactory.newInsert(item))
aggregator
}
Вот пример сгенерированного правила:
dialect "mvel"
global batch.model.MyAggregator aggregator
rule "1"
when condition
then do something on the aggregator
end
Для того же RDD пакету потребовалось 20 минут для оценки правил 3K, но 10 часов для оценки правил 10K!
Мне интересно, является ли вставка факта фактом лучшим подходом. Не лучше ли сразу вставить все элементы RDD, а затем запустить все правила? Мне это не кажется оптимальным, так как все факты будут в оперативной памяти одновременно.
Вы видите какие-либо проблемы с кодом выше?