Python: совместное использование блокировки между порожденными процессами

Конечная цель - выполнить метод в фоновом режиме, но не параллельно: когда несколько объектов вызывают этот метод, каждый должен дождаться своей очереди для продолжения. Чтобы добиться работы в фоновом режиме, мне нужно запустить метод в подпроцессе (а не в потоке), и мне нужно запустить его с помощью spawn (а не fork). Чтобы предотвратить параллельное выполнение, очевидным решением является общая глобальная блокировка между процессами.
Когда процессы разветвляются, что является значением по умолчанию в Unix, этого легко добиться, как показано в обоих следующих кодах.
Мы можем поделиться им как переменной класса:

import multiprocessing as mp
from time import sleep

class OneAtATime:

    l = mp.Lock()

    def f(self):
        with self.l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    a = OneAtATime()
    b = OneAtATime()
    p1 = mp.Process(target = a.f)
    p2 = mp.Process(target = b.f)
    p1.start()
    p2.start()

Или мы можем передать его методу:

import multiprocessing as mp
from time import sleep

class OneAtATime:
    def f(self, l):
        with l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    a = OneAtATime()
    b = OneAtATime()
    m = mp.Manager()
    l = mp.Lock()
    p1 = mp.Process(target = a.f, args = (l,))
    p2 = mp.Process(target = b.f, args = (l,))
    p1.start()
    p2.start()

Оба этих кода имеют соответствующее поведение печати «привет» с интервалом в одну секунду. Однако при изменении метода запуска на 'spawn', они становятся сломанными.
Первый (1) выводит оба "hello" одновременно. Это связано с тем, что внутреннее состояние класса не обрабатывается, поэтому у них разные блокировки.
Вторая (2) не работает с FileNotFoundError во время выполнения. Я думаю, это связано с тем, что блокировки не могут быть обработаны: см. Python, разделяющий блокировку между процессами.
В этом ответе предлагаются два исправления (примечание: я не могу использовать пул, потому что хочу случайным образом создать произвольное количество процессов).
Я не нашел способа чтобы адаптировать второе исправление, но я попытался реализовать первое:

import multiprocessing as mp
from time import sleep

if __name__ == "__main__":
    mp.set_start_method('spawn')

class OneAtATime:
    def f(self, l):
        with l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    a = OneAtATime()
    b = OneAtATime()
    m = mp.Manager()
    l = m.Lock()
    p1 = mp.Process(target = a.f, args = (l,))
    p2 = mp.Process(target = b.f, args = (l,))
    p1.start()
    p2.start()

Это не удается с AttributeError и FileNotFoundError (3). Фактически, он также не работает (BrokenPipe) при использовании метода fork (4).
Каков правильный способ разделения блокировки между порожденными процессами?
Краткое объяснение четырех провалы, которые я пронумеровали, тоже было бы неплохо. Я запускаю Python 3.6 под Archlinux.


person Zil0    schedule 22.06.2017    source источник


Ответы (2)


Поздравляю, вы прошли 90% пути. Последний шаг на самом деле сделать не так уж и сложно.

Да, ваш последний блок кода завершается с ошибкой AttributeError, но в чем конкретно заключается ошибка? «Невозможно включить атрибут OneAtATime». Это очень похоже на проблему, с которой вы уже столкнулись - это не проблема с классом OneAtATime.

Я внес следующее изменение, и оно сработало так, как вам хотелось бы:

файл ooat.py:

from time import sleep

class OneAtATime:
    def f(self, l):
        with l:
            sleep(1)
        print("Hello")

интерактивная оболочка:

import multiprocessing as mp
from oaat import OneAtATime
if __name__ == "__main__":
    mp.set_start_method('spawn')
    a = OneAtATime()
    b = OneAtATime()
    m = mp.Manager()
    l = m.Lock()
    p1 = mp.Process(target = a.f, args = (l,))
    p2 = mp.Process(target = b.f, args = (l,))
    p1.start()
    p2.start()

Как вы могли заметить, я на самом деле ничего не делал - просто разделил ваш код на два отдельных файла. Попробуйте, вы увидите, что все работает нормально. (По крайней мере, для меня, используя python 3.5 на ubuntu.)

person Scott Mermelstein    schedule 22.06.2017
comment
Я временно удалил этот ответ, когда увидел, что забыл протестировать его с помощью метода spawn. Затем, когда я проводил тестирование, он сказал, что OneAtATime не определен, но только потому, что я забыл строку from ooat import OneAtATime. Итак, теперь я, по-видимому, протестировал и fork, и spawn, и он отлично работает. - person Scott Mermelstein; 22.06.2017
comment
Ответ есть в вашем посте, но не в вашем посте. Это интерактивная оболочка. Фактически, я получал следующую ошибку: AttributeError: объект «ForkAwareLocal» не имеет атрибута «соединение». Там я прекратил попытки и написал вопрос, но я ушел от фактического решения в Google: stackoverflow.com/a/25456494/8194503 < / а>. У меня он отлично работает в том же файле, я не уверен, в чем была ваша ошибка. Отредактирую свой вопрос с исправлением. Не знаю, каков этикет, стоит ли мне принимать ваш ответ, я все равно полагаю, что принять его. Спасибо за ваше время ! - person Zil0; 22.06.2017
comment
Если вы решили свою проблему, а я этого не сделал, вам следует опубликовать свой ответ, подождать, в какой период он вам скажет, и принять свой ответ. Это вполне уместно, и со временем вы получите положительные голоса за свой ответ. - person Scott Mermelstein; 22.06.2017

Последний фрагмент кода работает при условии, что сценарий не завершается преждевременно. Достаточно присоединиться к процессам:

import multiprocessing as mp
from time import sleep

class OneAtATime:
    def f(self, l):
        with l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    mp.set_start_method('spawn')
    a = OneAtATime()
    b = OneAtATime()
    m = mp.Manager()
    l = m.Lock()
    p1 = mp.Process(target = a.f, args = (l,))
    p2 = mp.Process(target = b.f, args = (l,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Подробнее об ошибке, которую она вызвала, см. Здесь https://stackoverflow.com/a/25456494/8194503.

person Zil0    schedule 22.06.2017
comment
Теперь, когда вы упомянули об этом, мне следовало сразу подумать об этом. Хорошее исследование, как в вопросе, так и в ответе. - person Scott Mermelstein; 22.06.2017