Имам групова работа в Scala/Spark, която динамично създава правила за Drools в зависимост от някакъв вход и след това оценява правилата. Също така имам като вход RDD[T]
, който съответства на фактите, които трябва да бъдат вмъкнати в системата за правила.
Досега вмъквам фактите един по един и след това задействам всички правила за този факт. Правя това с помощта на rdd.aggregate
.
Операторът seqOp се дефинира по следния начин:
/**
* @param broadcastRules the broadcasted KieBase object containing all rules
* @param aggregator used to accumulate values when rule matches
* @param item the fact to run Drools with
* @tparam T the type of the given item
* @return the updated aggregator
*/
def seqOp[T: ClassTag](broadcastRules: Broadcast[KieBase])(
aggregator: MyAggregator,
item: T) : MyAggregator = {
val session = broadcastRules.value.newStatelessKieSession
session.setGlobal("aggregator", aggregator)
session.execute(CommandFactory.newInsert(item))
aggregator
}
Ето пример за генерирано правило:
dialect "mvel"
global batch.model.MyAggregator aggregator
rule "1"
when condition
then do something on the aggregator
end
За същата RDD на партидата бяха необходими 20 минути за оценка на 3K правила, но 10 часа за оценка на 10K правила!
Чудя се дали вмъкването на факт по факт е най-добрият подход. По-добре ли е да вмъкнете всички елементи от RDD наведнъж, след което да активирате всички правила? Не ми се струва оптимално, тъй като всички факти ще бъдат в работната памет едновременно.
Виждате ли проблем с горния код?