Агенты Clojure, потребляющие из очереди

Я пытаюсь выяснить, как лучше всего использовать агентов для получения элементов из очереди сообщений (Amazon SQS). Прямо сейчас у меня есть функция (process-queue-item), которая захватывает элементы из очереди и обрабатывает их.

Я хочу обрабатывать эти элементы одновременно, но не могу понять, как управлять агентами. По сути, я хочу, чтобы все агенты были заняты как можно больше, не извлекая множество элементов из очереди и не создавая невыполненных работ (у меня это будет работать на нескольких машинах, поэтому элементы нужно оставлять в очереди, пока они не появятся. действительно нужны).

Может ли кто-нибудь дать мне несколько советов по улучшению моей реализации?

(def active-agents (ref 0))

(defn process-queue-item [_]
  (dosync (alter active-agents inc))
  ;retrieve item from Message Queue (Amazon SQS) and process
  (dosync (alter active-agents dec)))

(defn -main []
  (def agents (for [x (range 20)] (agent x)))

  (loop [loop-count 0]

    (if (< @active-agents 20)
      (doseq [agent agents]
        (if (agent-errors agent)
          (clear-agent-errors agent))
        ;should skip this agent until later if it is still busy processing (not sure how)
        (send-off agent process-queue-item)))

    ;(apply await-for (* 10 1000) agents)
    (Thread/sleep  10000)
    (logging/info (str "ACTIVE AGENTS " @active-agents))
    (if (> 10 loop-count)
      (do (logging/info (str "done, let's cleanup " count))
       (doseq [agent agents]
         (if (agent-errors agent)
           (clear-agent-errors agent)))
       (apply await agents)
       (shutdown-agents))
      (recur (inc count)))))

person erikcw    schedule 08.04.2010    source источник
comment
Есть ли способ обработать очередь сообщений как seq, а затем просто использовать pmap для распараллеливания?   -  person Alex Stoddard    schedule 09.04.2010
comment
@Alex Stoddard: В моем случае process-queue-item фактически блокируется при вводе-выводе в сети, поэтому я не думаю, что pmap - правильный выбор, поскольку он использует столько потоков, сколько ядер имеет машина.   -  person erikcw    schedule 09.04.2010
comment
@erikw: Конечно, но это всего лишь деталь реализации pmap (потоки = # баллов + 2). Нет причин, по которым вы не могли бы написать версию pmap с параметризованным количеством потоков. См. Первую строку источника pmap: (let [n (+ 2 (.. Runtime getRuntime availableProcessors))   -  person Alex Stoddard    schedule 09.04.2010
comment
Привет, у меня есть несколько вопросов: 1. Агенты имеют ценность, вас интересует их ценность или вы просто используете их как пул потоков? 2. Есть ли окончательный результат использования очереди или процесс-элемент-очереди вызывает побочные эффекты?   -  person cgrand    schedule 12.04.2010
comment
@cgrand: 1) Меня не интересует ценность агентов, я просто использую их как пул потоков. 2) элемент-очереди-процесса имеет seid-эффекты (выталкивает результаты обратно в очередь сообщений).   -  person erikcw    schedule 13.04.2010
comment
@erikcw 1), и вы больше заинтересованы в том, чтобы узнать, когда работа выполнена, поэтому предпочитайте фьючерсы агентам. 2) Вот что я догадался. Вы видели мой ответ: stackoverflow.com/questions/2602791/?   -  person cgrand    schedule 13.04.2010


Ответы (4)


То, что вы просите, - это способ продолжать раздавать задания, но с некоторым верхним пределом. Один из простых подходов к этому - использовать семафор для согласования ограничения. Вот как я подхожу к этому:

(let [limit (.availableProcessors (Runtime/getRuntime))
      ; note: you might choose limit 20 based upon your problem description
      sem (java.util.concurrent.Semaphore. limit)]
  (defn submit-future-call
    "Takes a function of no args and yields a future object that will
    invoke the function in another thread, and will cache the result and
    return it on all subsequent calls to deref/@. If the computation has
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks
    until the completion of another future, where n is the number of
    available processors."  
    [#^Callable task]
    ; take a slot (or block until a slot is free)
    (.acquire sem)
    (try
      ; create a future that will free a slot on completion
      (future (try (task) (finally (.release sem))))
      (catch java.util.concurrent.RejectedExecutionException e
        ; no task was actually submitted
        (.release sem)
        (throw e)))))

(defmacro submit-future
  "Takes a body of expressions and yields a future object that will
  invoke the body in another thread, and will cache the result and
  return it on all subsequent calls to deref/@. If the computation has
  not yet finished, calls to deref/@ will block.
  If n futures have already been submitted, then submit-future blocks
  until the completion of another future, where n is the number of
  available processors."  
  [& body] `(submit-future-call (fn [] ~@body)))

#_(example
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@6c69d02b: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@38827968: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    ;; blocks at this point for a 2 processor PC until the previous
    ;; two futures complete
    #<core$future_call$reify__5782@214c4ac9: :pending>
    ;; then submits the job

Теперь, когда это сделано, вам просто нужно координировать выполнение самих задач. Похоже, у вас уже есть механизмы для этого. Цикл (будущее-отправка (элемент-очереди-процесса))

person Timothy Pratley    schedule 09.04.2010

Возможно, вы могли бы использовать функцию seque? Цитата (doc seque):

clojure.core/seque
([s] [n-or-q s])
  Creates a queued seq on another (presumably lazy) seq s. The queued
  seq will produce a concrete seq in the background, and can get up to
  n items ahead of the consumer. n-or-q can be an integer n buffer
  size, or an instance of java.util.concurrent BlockingQueue. Note
  that reading from a seque can block if the reader gets ahead of the
  producer.

Я имею в виду ленивую последовательность получения элементов очереди по сети; вы бы обернули это в seque, поместили это в Ref, и рабочие агенты потребляли предметы из этого seque. seque возвращает что-то, что выглядит как обычная последовательность с точки зрения вашего кода, при этом магия очереди происходит прозрачным образом. Обратите внимание, что если последовательность, которую вы помещаете внутри, разбита на фрагменты, она все равно будет принудительно выполняться по частям. Также обратите внимание, что первоначальный вызов самого seque, кажется, блокируется до тех пор, пока не будет получен начальный элемент или два (или кусок, в зависимости от случая; я думаю, что это больше связано с тем, как работают ленивые последовательности, чем сам seque).

Набросок кода (действительно отрывочный, вообще не проверенный):

(defn get-queue-items-seq []
  (lazy-seq
   (cons (get-queue-item)
         (get-queue-items-seq))))

(def task-source (ref (seque (get-queue-items-seq))))

(defn do-stuff []
  (let [worker (agent nil)]
    (if-let [result
             (dosync
               (when-let [task (first @task-source)]
                (send worker (fn [_] (do-stuff-with task)))))]
      (do (await worker)
          ;; maybe do something with worker's state
          (do-stuff))))) ;; continue working

(defn do-lots-of-stuff []
  (let [fs (doall (repeatedly 20 #(future (do-stuff))))]
    fs)))

На самом деле вам, вероятно, понадобится более сложный производитель последовательности элементов очереди, чтобы вы могли попросить его прекратить производство новых элементов (необходимость, если все это должно быть в состоянии изящно завершать работу; фьючерсы умрут, когда задача исходный код иссяк, используйте future-done?, чтобы узнать, сделали ли они это уже). И это то, что я вижу с первого взгляда ... Я уверен, что здесь есть еще кое-что, что нужно отполировать. Я думаю, что общий подход сработает.

person Michał Marczyk    schedule 09.04.2010
comment
Я добавил исправление в предпоследнюю строку скетча кода, в соответствии с которой будут фактически созданы фьючерсы. (На самом деле, это имеет решающее значение для всей идеи ... :-)) - person Michał Marczyk; 09.04.2010
comment
Я пытаюсь понять этот код. Почему источник задачи - ссылка? Вы, кажется, вообще не можете его изменить. - person Siddhartha Reddy; 05.09.2010
comment
@Siddhartha Reddy: На первый взгляд я бы сказал, что именно поэтому я назвал код действительно отрывочным. ;-) Думаю, чтобы было полезно, понадобится (alter task-source rest) (или next) в when-let внутри dosync. На самом деле, подумав об этом еще раз, я задаюсь вопросом, является ли использование seque здесь такой хорошей идеей; теперь мне кажется, что это увеличивает количество элементов в очереди, которые будут потеряны в случае сбоя локальной машины (поскольку seque втягивает элементы до того, как они будут запрошены рабочими). С другой стороны, в некоторых сценариях это может быть хорошо с точки зрения производительности; это - person Michał Marczyk; 06.09.2010
comment
просто догадка, нуждающаяся в профилировании. Между прочим, самовызов хвоста в do-stuff следует изменить на (recur), чтобы избежать взрыва стека. Я думаю, что примерно месяц спустя я намного лучше описал обработку очереди в Clojure в этом ответе; Интересно, может ли он быть для вас более полезен, чем этот? В любом случае, спасибо, что указали на эту ошибку! - person Michał Marczyk; 06.09.2010

Не уверен, насколько это идиоматично, поскольку я все еще новичок в этом языке, но для меня работает следующее решение:

(let [number-of-messages-per-time 2
      await-timeout 1000]
  (doseq [p-messages (partition number-of-messages-per-time messages)]
    (let [agents (map agent p-messages)]
      (doseq [a agents] (send-off a process))
      (apply await-for await-timeout agents)
      (map deref agents))))
person Marco Lazzeri    schedule 19.06.2012

person    schedule
comment
Это больше не работает с 1.4 (future и future-call не возвращают IFn, чего требует repeatedly). Тем не менее, вы можете легко заключить будущее в функцию, добавив (future перед #. - person Alex B; 08.06.2012
comment
@AlexB хороший улов, это даже не проблема 1.4: там должен был быть #. Код исправил, спасибо! - person cgrand; 08.06.2012