Clojure агенти консумират от опашка

Опитвам се да разбера най-добрия начин да използвам агенти за консумиране на елементи от опашка за съобщения (Amazon SQS). В момента имам функция (process-queue-item), която хваща елементи от опашката и ги обработва.

Искам да обработвам тези елементи едновременно, но не мога да си обясня как да контролирам агентите. По принцип искам да държа всички агенти заети колкото е възможно повече, без да дърпам много елементи от опашката и да създавам изоставане (ще накарам това да работи на няколко машини, така че елементите трябва да бъдат оставени в опашката, докато не са наистина необходими).

Може ли някой да ми даде някои насоки за подобряване на внедряването ми?

(def active-agents (ref 0))

(defn process-queue-item [_]
  (dosync (alter active-agents inc))
  ;retrieve item from Message Queue (Amazon SQS) and process
  (dosync (alter active-agents dec)))

(defn -main []
  (def agents (for [x (range 20)] (agent x)))

  (loop [loop-count 0]

    (if (< @active-agents 20)
      (doseq [agent agents]
        (if (agent-errors agent)
          (clear-agent-errors agent))
        ;should skip this agent until later if it is still busy processing (not sure how)
        (send-off agent process-queue-item)))

    ;(apply await-for (* 10 1000) agents)
    (Thread/sleep  10000)
    (logging/info (str "ACTIVE AGENTS " @active-agents))
    (if (> 10 loop-count)
      (do (logging/info (str "done, let's cleanup " count))
       (doseq [agent agents]
         (if (agent-errors agent)
           (clear-agent-errors agent)))
       (apply await agents)
       (shutdown-agents))
      (recur (inc count)))))

person erikcw    schedule 08.04.2010    source източник
comment
Има ли някакъв начин да третирате опашката за съобщения като seq и след това просто да използвате pmap, за да получите паралелизиране?   -  person Alex Stoddard    schedule 09.04.2010
comment
@Alex Stoddard: В моя случай process-queue-item всъщност блокира мрежови IO, така че не мисля, че pmap е правилният избор, тъй като използва само толкова нишки, колкото машината има ядра.   -  person erikcw    schedule 09.04.2010
comment
@erikw: Разбира се, но това е само детайл за внедряване на pmap (нишки = #ядра + 2). Няма причина да не можете да напишете версия на pmap с параметризиран брой нишки. Вижте първия ред на източника на pmap: (нека [n (+ 2 (.. Runtime getRuntime availableProcessors))   -  person Alex Stoddard    schedule 09.04.2010
comment
Здравейте, имам няколко въпроса: 1. агентите имат стойност, интересувате ли се от тяхната стойност или просто я използвате като пул от нишки? 2. има ли краен резултат от потреблението на опашката или процесът-опашка-елемент изпълнява странични ефекти?   -  person cgrand    schedule 12.04.2010
comment
@cgrand: 1) Не се интересувам от стойността на агентите, просто ги използвам като пул от нишки. 2) process-queue-item има seid-ефекти (избутва резултатите обратно към опашка за съобщения).   -  person erikcw    schedule 13.04.2010
comment
@erikcw 1) и сте по-заинтересовани да знаете дали|когато работата е свършена, така че предпочитайте фючърси пред агенти. 2) Това е, което предположих. Видяхте ли моя отговор: stackoverflow.com/questions/2602791/ ?   -  person cgrand    schedule 13.04.2010


Отговори (4)


Това, което искате, е начин да продължите да раздавате задачи, но с някаква горна граница. Един прост подход към това е да се използва семафор за координиране на ограничението. Ето как бих подходил:

(let [limit (.availableProcessors (Runtime/getRuntime))
      ; note: you might choose limit 20 based upon your problem description
      sem (java.util.concurrent.Semaphore. limit)]
  (defn submit-future-call
    "Takes a function of no args and yields a future object that will
    invoke the function in another thread, and will cache the result and
    return it on all subsequent calls to deref/@. If the computation has
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks
    until the completion of another future, where n is the number of
    available processors."  
    [#^Callable task]
    ; take a slot (or block until a slot is free)
    (.acquire sem)
    (try
      ; create a future that will free a slot on completion
      (future (try (task) (finally (.release sem))))
      (catch java.util.concurrent.RejectedExecutionException e
        ; no task was actually submitted
        (.release sem)
        (throw e)))))

(defmacro submit-future
  "Takes a body of expressions and yields a future object that will
  invoke the body in another thread, and will cache the result and
  return it on all subsequent calls to deref/@. If the computation has
  not yet finished, calls to deref/@ will block.
  If n futures have already been submitted, then submit-future blocks
  until the completion of another future, where n is the number of
  available processors."  
  [& body] `(submit-future-call (fn [] ~@body)))

#_(example
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@6c69d02b: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@38827968: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    ;; blocks at this point for a 2 processor PC until the previous
    ;; two futures complete
    #<core$future_call$reify__5782@214c4ac9: :pending>
    ;; then submits the job

С това сега просто трябва да координирате как се изпълняват самите задачи. Изглежда, че вече разполагате с механизмите за това. Цикъл (изпращане-бъдеще (процес-опашка-елемент))

person Timothy Pratley    schedule 09.04.2010

Може би бихте могли да използвате функцията seque? Цитирам (doc seque):

clojure.core/seque
([s] [n-or-q s])
  Creates a queued seq on another (presumably lazy) seq s. The queued
  seq will produce a concrete seq in the background, and can get up to
  n items ahead of the consumer. n-or-q can be an integer n buffer
  size, or an instance of java.util.concurrent BlockingQueue. Note
  that reading from a seque can block if the reader gets ahead of the
  producer.

Това, което имам предвид, е мързелива последователност, получаваща елементи от опашката по мрежата; бихте опаковали това в seque, поставили това в реф и агентите-работници да консумират елементи от това seque. seque връща нещо, което изглежда точно като обикновен сек от гледна точка на вашия код, като магията на опашката се случва по прозрачен начин. Обърнете внимание, че ако последователността, която поставите вътре, е разделена на парчета, тогава тя все още ще бъде принудена парче по едно. Също така имайте предвид, че самото първоначално извикване на seque изглежда блокира, докато не бъдат получени първоначален елемент или два (или парче, в зависимост от случая; мисля, че това е по-скоро свързано с начина, по който работят мързеливите последователности, отколкото със самия seque).

Скица на кода (наистина схематичен, изобщо не е тестван):

(defn get-queue-items-seq []
  (lazy-seq
   (cons (get-queue-item)
         (get-queue-items-seq))))

(def task-source (ref (seque (get-queue-items-seq))))

(defn do-stuff []
  (let [worker (agent nil)]
    (if-let [result
             (dosync
               (when-let [task (first @task-source)]
                (send worker (fn [_] (do-stuff-with task)))))]
      (do (await worker)
          ;; maybe do something with worker's state
          (do-stuff))))) ;; continue working

(defn do-lots-of-stuff []
  (let [fs (doall (repeatedly 20 #(future (do-stuff))))]
    fs)))

Всъщност вероятно бихте искали по-сложен производител на последователността на елемент от опашката, така че да можете да го помолите да спре да произвежда нови елементи (необходимост, ако цялото нещо трябва да може да бъде затворено елегантно; фючърсите ще умрат, когато задачата източникът работи на сухо, използвайте future-done?, за да видите дали вече са го направили). И това е нещо, което виждам на пръв поглед... Сигурен съм, че има още неща за излъскване тук. Мисля обаче, че общият подход ще работи.

person Michał Marczyk    schedule 09.04.2010
comment
Добавих корекция към предпоследния ред на скицата на кода, чрез която действително ще бъдат създадени фючърси. (От решаващо значение за цялата идея, наистина... :-)) - person Michał Marczyk; 09.04.2010
comment
Опитвам се да разбера този код. Защо източникът на задачата е препратка? Изглежда изобщо не го променяте. - person Siddhartha Reddy; 05.09.2010
comment
@Siddhartha Reddy: На пръв поглед бих казал, че това е причината да нарека кода наистина схематичен. ;-) Предполагам, че ще има нужда от (alter task-source rest) (или next) в when-let вътре в dosync, за да бъде полезно. Всъщност, като мисля за това отново, се чудя дали използването на seque тук е толкова добра идея в крайна сметка; сега ми се струва, че увеличава броя на елементите от опашката, които биха били загубени в случай на срив на локалната машина (тъй като seque изтегля елементите, преди да бъдат поискани от работниците). От друга страна, в някои сценарии може да е добра производителност; това е - person Michał Marczyk; 06.09.2010
comment
само предчувствие, което се нуждае от профилиране. Между другото, самоизвикването на опашката в do-stuff трябва да се промени на (recur), за да се избегне взривяването на стека. Мисля, че свърших много по-добра работа, описвайки обработката на опашка в Clojure около месец по-късно в този отговор; Чудя се дали може да ви бъде по-полезен от този? Във всеки случай, благодаря, че посочихте тази грешка! - person Michał Marczyk; 06.09.2010

Не съм сигурен колко идиоматично е това, тъй като все още съм начинаещ с езика, но следното решение работи за мен:

(let [number-of-messages-per-time 2
      await-timeout 1000]
  (doseq [p-messages (partition number-of-messages-per-time messages)]
    (let [agents (map agent p-messages)]
      (doseq [a agents] (send-off a process))
      (apply await-for await-timeout agents)
      (map deref agents))))
person Marco Lazzeri    schedule 19.06.2012

person    schedule
comment
Това вече не работи с 1.4 (future и future-call не връщат IFn, което repeatedly изисква). Можете обаче лесно да опаковате бъдеще във функция, като поставите пред (future #. - person Alex B; 08.06.2012
comment
@AlexB добър улов, дори не е проблем с 1.4: # трябваше да е там. Поправих кода, благодаря! - person cgrand; 08.06.2012