Опитвам се да разбера най-добрия начин да използвам агенти за консумиране на елементи от опашка за съобщения (Amazon SQS). В момента имам функция (process-queue-item), която хваща елементи от опашката и ги обработва.
Искам да обработвам тези елементи едновременно, но не мога да си обясня как да контролирам агентите. По принцип искам да държа всички агенти заети колкото е възможно повече, без да дърпам много елементи от опашката и да създавам изоставане (ще накарам това да работи на няколко машини, така че елементите трябва да бъдат оставени в опашката, докато не са наистина необходими).
Може ли някой да ми даде някои насоки за подобряване на внедряването ми?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))