Как запомнить функцию, которая использует core.async и чтение неблокирующего канала?

Я хотел бы использовать memoize для функции, которая использует core.async и <!, например

(defn foo [x]
  (go
    (<! (timeout 2000))
    (* 2 x)))

(В реальной жизни это может быть полезно для кеширования результатов обращений к серверу)

Я смог добиться этого, написав версию memoize для core.async (почти такой же код, как у memoize):

(defn memoize-async [f]
  (let [mem (atom {})]
    (fn [& args]
      (go
        (if-let [e (find @mem args)]
          (val e)
         (let [ret (<! (apply f args))]; this line differs from memoize [ret (apply f args)]
            (swap! mem assoc args ret)
            ret))))))

Пример использования:

(def foo-memo (memoize-async foo))
(go (println (<! (foo-memo 3)))); delay because of (<! (timeout 2000))

(go (println (<! (foo-memo 3)))); subsequent calls are memoized => no delay

Мне интересно, есть ли более простые способы добиться того же результата.

**Примечание: мне нужно решение, которое работает с <!. Для <!! см. этот вопрос: Как запомнить функцию который использует core.async и блокирует чтение канала? **


person viebel    schedule 11.07.2014    source источник


Ответы (3)


Для этого вы можете использовать встроенную функцию memoize. Начните с определения метода, который читает из канала и возвращает значение:

 (defn wait-for [ch]
      (<!! ch))

Обратите внимание, что мы будем использовать <!!, а не <!, потому что нам нужен этот функциональный блок, пока в канале нет данных во всех случаях. <! демонстрирует такое поведение только при использовании в форме внутри блока go.

Затем вы можете создать свою мемоизированную функцию, составив эту функцию с foo, например:

(def foo-memo (memoize (comp wait-for foo)))

foo возвращает канал, поэтому wait-for будет блокироваться до тех пор, пока этот канал не получит значение (т. е. пока операция внутри foo не завершится).

foo-memo можно использовать аналогично вашему примеру выше, за исключением того, что вам не нужен вызов <!, потому что wait-for заблокирует вас:

(go (println (foo-memo 3))

Вы также можете вызвать это вне блока go, и он будет вести себя так, как вы ожидаете (т.е. заблокировать вызывающий поток, пока не вернется foo).

person Jesse Rosalia    schedule 15.07.2014
comment
Похоже, что <!! реализовано на стороне clojure с помощью take! с промисом; обратный вызов take! запускает обещание, которое позволяет функции завершиться. Похоже, что есть библиотека с открытым исходным кодом для демонстрации обещаний JS clojurescript. Возможно, вы сможете реализовать <!! с помощью этой библиотеки. Я постараюсь сегодня вечером что-нибудь состряпать, если ты не успеешь. - person Jesse Rosalia; 15.07.2014
comment
Попробовав большую часть двух часов, я не думаю, что смогу добиться большего успеха, чем ваше оригинальное решение. Проблема, с которой я постоянно сталкиваюсь, заключается в том, что <! нужно вызывать непосредственно из блока go (а не в функции, вызываемой из блока go), что ограничивает вашу возможность определить memo-функцию, которая возвращает значение, которое вы хотите сохранить ( т. е. вы не хотите, чтобы ваша заметка удерживала каналы). Библиотеки промисов, о которых я упоминал ранее, работают с обратными вызовами и событиями, что бесполезно в данной ситуации. Мое решение работает в clojure, но я не вижу способа заставить его работать в cljs. - person Jesse Rosalia; 16.07.2014
comment
Спасибо. Я ценю ваши усилия. Как вы думаете, могу ли я предложить команде core.aync добавить memoize-async? - person viebel; 16.07.2014
comment
Готово, спасибо. Вы определенно можете предложить этот метод для core.async. Я не уверен, что это за процесс, и примут ли они его, но попробовать стоит. - person Jesse Rosalia; 16.07.2014
comment
Недостаток использования чего-то вроде memoize-async, как написано, заключается в том, что чем дольше выполняется асинхронная операция, тем выше вероятность избыточных выборок - ничего не сохраняется, чтобы указать, что разрешение находится в процессе обработки для определенного значения. Некоторое время назад обсуждался promise-chan — вы можете запомнить что-то вроде что наивно и есть правильные вещи случаются. Вы также можете сходить с ума по управлению состоянием и мультиплексированию каналов, если не хотите идти по пути обещания-тян. - person moe; 20.06.2015

Это было немного сложнее, чем я ожидал. Ваше решение неверно, потому что, когда вы снова вызываете свою мемоизированную функцию с теми же аргументами, раньше, чем первый запуск завершит выполнение своего блока go, вы снова вызовете ее и получите промах. Это часто бывает при обработке списков с помощью core.async.

В приведенном ниже примере используется pub/sub core.async для решения этой проблемы (проверено только в CLJS):

(def lookup-sentinel  #?(:clj ::not-found :cljs (js-obj))
(def pending-sentinel #?(:clj ::pending   :cljs (js-obj))

(defn memoize-async
  [f]
  (let [>in (chan)
        pending (pub >in :args)
        mem (atom {})]
    (letfn
        [(memoized [& args]
           (go
             (let [v (get @mem args lookup-sentinel)]
               (condp identical? v
                 lookup-sentinel
                 (do
                   (swap! mem assoc args pending-sentinel)
                   (go
                     (let [ret (<! (apply f args))]
                       (swap! mem assoc args ret)
                       (put! >in {:args args :ret ret})))
                   (<! (apply memoized args)))
                 pending-sentinel
                 (let [<out (chan 1)]
                   (sub pending args <out)
                   (:ret (<! <out)))
                 v))))]
        memoized)))

ВНИМАНИЕ: возможно утечка памяти, подписки и <out каналы не закрыты

person skrat    schedule 28.06.2016

Я использовал эту функцию в одном из своих проектов для кэширования HTTP-вызовов. Функция кэширует результаты в течение заданного промежутка времени и использует барьер для предотвращения многократного выполнения функции, когда кэш холодный (из-за переключения контекста внутри блока go).

(defn memoize-af-until
  [af ms clock]
  (let [barrier (async/chan 1)
        last-return (volatile! nil)
        last-return-ms (volatile! nil)]
    (fn [& args]
      (async/go
        (>! barrier :token)
        (let [now-ms (.now clock)]
          (when (or (not @last-return-ms) (< @last-return-ms (- now-ms ms)))
            (vreset! last-return (<! (apply af args)))
            (vreset! last-return-ms now-ms))
          (<! barrier)
          @last-return)))))

Вы можете проверить, правильно ли он работает, установив для времени кэширования значение 0 и наблюдая, что вызовы двух функций занимают примерно 10 секунд. Без барьера два вызова завершились бы одновременно:

(def memo (memoize-af-until #(async/timeout 5000) 0 js/Date))
(async/take! (memo) #(println "[:a] Finished"))
(async/take! (memo) #(println "[:b] Finished"))
person Milan Munzar    schedule 02.12.2020