синхронизация потоков с использованием очередей сообщений не в порядке

Следующий код взят из основных докладов Рэймонда Pycon о валюте, отличная презентация. Это немного длинно, поэтому я начну с вопроса:

  • Если сообщения добавляются вместе в очередь и оба должны быть напечатаны до того, как цепочка будет считаться завершенной, почему они печатаются в перемешанном виде? Добавление задержки в 100 мс между инициализацией потока решает эту проблему, и результат соответствует ожидаемому.

-

import Queue, time

counter_queue = Queue.Queue()
counter = 0

def counter_manager():
    # I have EXCLUSIVE rights to update the counter variable
    global counter

    while True:
        increment = counter_queue.get()
        counter += 1
        print_queue.put(['The count is {}\n'.format(counter),
                         '----------\n'])
        counter_queue.task_done()

t = threading.Thread(target=counter_manager)
t.daemon = True
t.start()
del t


print_queue = Queue.Queue()

def print_manager():
    # I have EXCLUSIVE rights to call the print keyword
    while True:
        job = print_queue.get()
        for line in job:
            print line
        print_queue.task_done()

t = threading.Thread(target=print_manager)
t.daemon = True
t.start()
del t

def worker_threads():
    counter_queue.put(1)

print_queue.put(['Starting up.. with message queues'])
worker_threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    worker_threads.append(t)
    t.start()
    # time.sleep(0.1)
for t in worker_threads:
    t.join()

counter_queue.join()
print_queue.put(['Finishing up'])
print_queue.join()

Он использует counter_manager и print_queue в качестве демонов для получения сообщений от рабочих потоков и их последовательного выполнения. Насколько я понимаю, это должно сохранить порядок выполнения. Однако я получаю следующий вывод:

Starting up.. with message queues
The couns is 1The couns is 2

--------------------

The couns is 3
 ----------The couns is 4
 The couns is 5
The couns is 6
----------
----------
----------
The couns is 7

----------The couns is 8

The couns is 9----------

----------The couns is 10

----------
Finishing up

Счетчик увеличивается правильно, но сообщения печати перепутаны.

Если я раскомментирую оператор сна, инициализация потоков задерживается на 100 мс, а вывод правильный.

Starting up.. with message queues
 The couns is 1
----------
The couns is 2
----------
The couns is 3
----------
The couns is 4
----------
The couns is 5
----------
The couns is 6
----------
The couns is 7
----------
The couns is 8
----------
The couns is 9
----------
The couns is 10
----------
Finishing up

Используя очереди, отпечатки должны быть в порядке. Разве это не правильно?


Рабочий код

def worker():
    global counter

    counter += 1
    print 'The couns is {}'.format(counter)  # read the var (race cond.)
    print '----------'

person Chen A.    schedule 13.10.2017    source источник


Ответы (1)


Я не смог воспроизвести проблему. Чтобы запустить его, я добавил импорт threading и переименовал функцию worker_threads в worker.

На выходе есть «счетчики», но ваш код указывает «количество». Можете ли вы убедиться, что выходные данные и код, который вы предоставляете, совпадают?

Ваш вывод, кажется, чередуется только на линейном уровне.

Интересно, были ли в предыдущей версии кода два разных .put (не атомарных) в counter_manager, которые позже были объединены в один .put с несколькими строками?

person ryachza    schedule 13.10.2017
comment
Вау, спасибо! Я определил рабочую функцию файла в другой части и использовал ее. Я по ошибке поставил worker вместо worker_threads - person Chen A.; 13.10.2017
comment
@Vinny Можете ли вы тогда включить определение worker? worker_threads не будет работать как Thread target, потому что worker_threads также определяется как список. - person ryachza; 13.10.2017
comment
Я добавил это. Рабочий метод не является атмосферным. Он используется для создания состояния гонки и решения его с помощью очередей сообщений. - person Chen A.; 13.10.2017
comment
И это worker_queue в моем исходном коде. он определен как counter_queue.put(1), и это цель Threading.thread - person Chen A.; 13.10.2017