Как исправить взаимоблокировку в join() в Ruby

Я работаю в многопоточности в Ruby. Фрагмент кода:

  threads_array = Array.new(num_of_threads)  
  1.upto(num_of_threads) do |i|  

    Thread.abort_on_exception = true
      threads_array[i-1] =  Thread.new {
        catch(:exit) do
          print "s #{i}"
          user_id = nil
          loop do
            user_id = user_ids.pop()
            if user_id == nil
              print "a #{i}"
              Thread.stop()
            end
            dosomething(user_id)
          end
        end
      }
    end
    #puts "after thread"
    threads_array.each {|thread| thread.join}

Я не использую блокировки мьютексов, но получаю тупик. Это вывод приведенного выше фрагмента кода:

s 2s 6s 8s 1s 11s 7s 10s 14s 16s 21s 24s 5s 26s 3s 19s 20s 23s 4s 28s 9s 12s 18s 22s 29s 30s 27s 13s 17s 15s 25a 4a 10a 3a 6a 21a 24a 16a 9a 18a 5a 28a 20a 2a 22a 11a 29a 8a 14a 23a 26a 1a 19a 7a 12fatal: deadlock detected

Приведенный выше вывод говорит мне, что тупик возникает после того, как массив user_ids имеет значение null и происходит с потоками join и stop.

Что на самом деле происходит и как решить эту ошибку?


person sravan_kumar    schedule 19.01.2012    source источник
comment
был ли мой ответ полезен? Вы решили проблему?   -  person Aliaksei Kliuchnikau    schedule 20.01.2012


Ответы (3)


Самый простой код для воспроизведения этой проблемы:

t = Thread.new { Thread.stop }
t.join # => exception in `join': deadlock detected (fatal)

Thread::stop → ноль

Останавливает выполнение текущего потока, переводя его в «спящее» состояние, и планирует выполнение другого потока.

Thread#join → thr
Thread#join(limit) → thr

Вызывающий поток приостановит выполнение и запустит thr. Не возвращается, пока thr не выйдет или пока не истечет лимит секунд. Если срок истекает, возвращается nil, в противном случае возвращается thr.

Насколько я понимаю, вы вызываете Thread.join без параметров в потоке и ждете его выхода, но дочерний поток вызывает Thread.stop и переходит в статус sleep. Это тупиковая ситуация, основной поток ожидает завершения дочернего потока, но дочерний поток спит и не отвечает.

Если вы вызовете join с параметром limit, то дочерний поток будет прерван по истечении тайм-аута, не вызывая взаимоблокировки вашей программы:

t = Thread.new { Thread.stop }
t.join 1 # => Process finished with exit code 0

Я бы рекомендовал выходить из ваших рабочих потоков после того, как они выполнят задание с помощью Thread.exit, или избавиться от бесконечного цикла и нормально достичь конца потока выполнения, например:

if user_id == nil
  raise StopIteration
end

#or 
if user_id == nil
  Thread.exit
end
person Aliaksei Kliuchnikau    schedule 19.01.2012

В дополнение к ответу Алекса Ключникова я добавлю, что #join может вызвать эту ошибку, когда поток ожидает Queue#pop. Простое и продуманное решение — вызов #join с тайм-аутом.

Это из рубина 2.2.2:

[27] pry(main)> q=Queue.new
=> #<Thread::Queue:0x00000003a39848>
[30] pry(main)> q << "asdggg"
=> #<Thread::Queue:0x00000003a39848>
[31] pry(main)> q << "as"
=> #<Thread::Queue:0x00000003a39848>
[32] pry(main)> t = Thread.new {
[32] pry(main)*   while s = q.pop
[32] pry(main)*     puts s
[32] pry(main)*   end  
[32] pry(main)* }  
asdggg
as
=> #<Thread:0x00000003817ce0@(pry):34 sleep>
[33] pry(main)> q << "asg"
asg
=> #<Thread::Queue:0x00000003a39848>
[34] pry(main)> q << "ashg"
ashg
=> #<Thread::Queue:0x00000003a39848>
[35] pry(main)> t.join
fatal: No live threads left. Deadlock?
from (pry):41:in `join'
[36] pry(main)> t.join(5)
=> nil
person akostadinov    schedule 27.08.2015
comment
что, если очередь является постоянным соединением http? например, в прямом эфире, и результат в потоке является случайным, t.join (5) все еще работает? - person crazy_phage; 04.10.2015
comment
@crazy_phage, у меня не было такого варианта использования, но я не понимаю, почему он не должен работать. В случае постоянного HTTP-соединения, я думаю, вы реализуете тайм-аут, после которого соединение должно быть закрыто, верно? Если вы хотите ждать вечно, вы можете установить очень большое значение, например 10 лет. - person akostadinov; 04.10.2015
comment
ну, это похоже на то, что http-соединение - это канал, и у меня есть еще один поток для чтения из канала, если я использую t.join, а не j.join 5, произойдет сбой, но я не понял, почему это происходит, потому что я побежал что в sidekiq журнал ничего не показал. Итак, я только что увидел ваш ответ, и я думаю, именно поэтому это происходит. - person crazy_phage; 05.10.2015

Если я правильно понял ваши намерения, я бы подумал о чем-то более простом (и, возможно, более безопасном, users_ids.pop() из потока мне кажется страшным):

user_ids = (0..19).to_a
number_of_threads = 3

user_ids \
  .each_slice(user_ids.length / number_of_threads + 1) \
  .map { |slice| 
      Thread.new(slice) { |s| 
        puts s.inspect 
      }
  }.map(&:join)
person Victor Moroz    schedule 19.01.2012