Изчакване на асинхронни повиквания с EventMachine и Ruby влакна

Изпълнявам този кодов фрагмент под Ruby 1.9.2:

require "eventmachine"
require "fiber"

EM.run do
  fiber = Fiber.new do
    current_fiber = Fiber.current
    EM.add_timer(2) do
      print "B"
      current_fiber.resume("D")
    end
    Fiber.yield
  end
  print "A"
  val = fiber.resume
  print "C"
  print val
  EM.stop
end

Очаквам изхода да бъде "ABCD", като програмата прави пауза за две секунди след "A". Вместо това обаче той просто отпечатва "AC" веднага, след което изчаква около две секунди, преди да излезе. какво правя грешно

(За справка, опитвам се да възпроизведа поведението в стил em-synchrony, описано в тази статия без използване на em-synchrony.)

Редактиране: Ето някои повече подробности за това, което в крайна сметка се опитвам да постигна. Разработвам API на Grape, работещ на Thin, и всеки манипулатор на маршрути трябва да направи различни извиквания последователно към хранилища за данни, ZooKeeper, други HTTP услуги и т.н., преди да върне отговор.

em-synchrony е наистина страхотен, но продължавам да се сблъсквам с проблеми с отстъпване от коренното влакно или с резултати, показващи несинхронните симптоми на случая по-горе. rack-fiber_pool също изглежда потенциално полезен, но не съм склонен да се ангажирам да го използвам, защото, извън кутията, той разваля всичките ми модулни тестове на Rack::Test.

Намалих проблемите си в простия пример по-горе, защото изглежда, че имам фундаментално неразбиране относно това как трябва да се използват заедно влакна и EventMachine, което ми пречи да използвам по-сложните рамки ефективно.


person breaker    schedule 04.10.2012    source източник
comment
Относно Rack::Test можете да опитате да го use само на вашия config.ru и да обвиете тестовете си във влакна, като използвате rspec around блок или нещо подобно.   -  person Renato Zannon    schedule 04.10.2012
comment
Поради това, че Grape не работи добре с добавения мидълуер (винаги го поставя последен вместо първи, където трябва да бъде в този случай), в крайна сметка трябваше да обвия приложението Grape с помощта на Rack::Builder в моята конфигурация и да добавя Rack::FiberPool към че. Което изцяло заобиколи проблема с тестването на модула. :)   -  person breaker    schedule 05.10.2012
comment
Вие спирате EM веднага след като AC е отпечатано.   -  person phil pirozhkov    schedule 06.10.2012


Отговори (1)


Вероятно сте искали нещо подобно:

require "eventmachine"
require "fiber"

def value
  current_fiber = Fiber.current

  EM.add_timer(2) do
    puts "B"
    current_fiber.resume("D") # Wakes the fiber
  end

  Fiber.yield # Suspends the Fiber, and returns "D" after #resume is called
end

EM.run do
  Fiber.new {
    puts "A"
    val = value
    puts "C"
    puts val

    EM.stop
  }.resume

  puts "(Async stuff happening)"
end

Това трябва да доведе до следния резултат:

A
(Async stuff happening)
B
C
D

По-концептуално обяснение:

Фибрите помагат за разплитането на асинхронния код, защото те трябва да бъдат спрени и реанимирани парчета код, подобно на ръчни нишки. Това позволява хитри трикове относно реда, в който се случват нещата. Малък пример:

fiberA = Fiber.new {
  puts "A"
  Fiber.yield
  puts "C"
}

fiberB = Fiber.new {
  puts "B"
  Fiber.yield
  puts "D"
}

fiberA.resume # prints "A"
fiberB.resume # prints "B"
fiberA.resume # prints "C"
fiberB.resume # prints "D"

Така че, когато #resume се извика на влакно, то възобновява своето изпълнение, било то от началото на блока (за нови влакна), или от предишно Fiber.yield извикване, и след това се изпълнява, докато бъде намерен друг Fiber.yield или блокът свърши.

Важно е да се отбележи, че поставянето на последователност от действия вътре във влакно е начин да се посочи времева зависимост между тях (puts "C" не може да се изпълнява преди puts "A"), докато действията върху "паралелни" влакна не могат да разчитат на (и трябва да) не ме интересува) дали действията на другите влакна са се изпълнили или не: Ще отпечатаме "BACD" само чрез размяна на първите две resume извиквания.

И така, ето как rack-fiber_pool прави своята магия: Той поставя всяка заявка, която вашето приложение получава във влакно (което предполага независимост от реда), и след това очаква от вас да Fiber.yield на IO действия, така че сървърът да може да приема други заявки. След това, вътре в обратните извиквания на EventMachine, предавате блок, който съдържа current_fiber.resume, така че вашето влакно да бъде реанимирано, когато отговорът на заявката/заявката/каквото е готово.

Това вече е дълъг отговор, но мога да дам пример за EventMachine, ако все още не е ясно (разбирам, че това е космата концепция за грок, борих се много).


Актуализация: Създадох пример, който може да помогне на всеки, който все още се бори с концепциите: https://gist.github.com/renato-zannon/4698724. Препоръчвам да бягате и да играете с него.

person Renato Zannon    schedule 04.10.2012
comment
Благодаря -- въпреки че това не помага наистина, например в контекста на обслужване на нещо от уеб приложение, което е моята крайна цел. Бих искал да направя куп асинхронни действия като това последователно, когато обслужвам HTTP заявка, след което да върна някакъв резултат на клиента. - person breaker; 04.10.2012
comment
Е, тогава вероятно трябваше да го посочите във въпроса си :) Можете ли да редактирате въпроса, за да го разширите малко относно това, което точно искате? Но както и да е, основните принципи са същите, въпреки че в среда на webapp вероятно е по-добре да използвате em-synchrony заедно с нещо като rack-fiber_pool вместо да развивате собствената си инфраструктура. - person Renato Zannon; 04.10.2012
comment
Сигурен; Добавих още подробности. Борих се с em-synchrony и rack-fiber_pool половин ден без напредък, така че реших, че просто пропускам някаква фундаментална концепция. - person breaker; 04.10.2012
comment
Благодаря! Мисля, че вече го разбирам много по-добре. Това, че всички последователни действия са обвити в едно и също влакно, за да се установи подреждането, определено помага. - person breaker; 04.10.2012
comment
Не, нищо повече не е необходимо; Днес всичко работи. Благодаря! - person breaker; 05.10.2012