Я пытаюсь выяснить, как лучше всего использовать агентов для получения элементов из очереди сообщений (Amazon SQS). Прямо сейчас у меня есть функция (process-queue-item), которая захватывает элементы из очереди и обрабатывает их.
Я хочу обрабатывать эти элементы одновременно, но не могу понять, как управлять агентами. По сути, я хочу, чтобы все агенты были заняты как можно больше, не извлекая множество элементов из очереди и не создавая невыполненных работ (у меня это будет работать на нескольких машинах, поэтому элементы нужно оставлять в очереди, пока они не появятся. действительно нужны).
Может ли кто-нибудь дать мне несколько советов по улучшению моей реализации?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))