Как создать глобальную блокировку/семафор с multiprocessing.pool в Python?

Я хочу ограничить доступ к ресурсам в дочерних процессах. Например, ограничьте http-загрузки, disk io и т. д. Как я могу добиться этого, расширив этот базовый код?

Поделитесь, пожалуйста, некоторыми базовыми примерами кода.

pool = multiprocessing.Pool(multiprocessing.cpu_count())
while job_queue.is_jobs_for_processing():
  for job in job_queue.pull_jobs_for_processing:
    pool.apply_async(do_job, callback = callback)
pool.close()
pool.join()

person Chameleon    schedule 22.02.2015    source источник
comment
Вы хотите ограничить доступ к ресурсам с помощью Lock или Semaphore? Есть ли причина не использовать просто multiprocessing.Lock / multiprocessing.Semaphore?   -  person dano    schedule 23.02.2015
comment
@dano Как передать multiprocessing.Lock() или Semaphore() в пул? Какие есть варианты для глобального доступа?   -  person Chameleon    schedule 24.02.2015
comment
Необходимость ограничения доступа к ресурсам подразумевает необходимость синхронизации не пула процессов, а рабочих задач. Почему бы вам не объяснить, чего именно вы хотите достичь?   -  person Michael Foukarakis    schedule 25.02.2015
comment
@MichaelFoukarakis Не важно почему? важно как?. Я могу ответить вам почему? поскольку случайный ввод-вывод медленнее, чем последовательный ввод-вывод. Я ответил на ваш вопрос? Посмотреть статистику — goo.gl/TbC2xp. Memcache работает иначе, чем диск и жесткий диск, а не флэш-память (его часто называют диском, но это не диск) или www-сервером - некоторым нужен семафор, некоторым не нужен - все, что мне нужно, изучить глобальный шаблон семафора для многопроцессорной обработки в Python, как и многие другие люди.   -  person Chameleon    schedule 25.02.2015
comment
@MichaelFoukarakis WWW-серверу нужен семафор, чтобы быть вежливым и не отказывать сайту с параллельным огромным количеством запросов - он ограничен не дизайном, а интернет-этикой.   -  person Chameleon    schedule 25.02.2015


Ответы (2)


Используйте аргументы initializer и initargs при создании пула, чтобы определить глобальный объект во всех дочерних процессах.

Например:

from multiprocessing import Pool, Lock
from time import sleep

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def init_child(lock_):
    global lock
    lock = lock_

def main():
    lock = Lock()
    poolsize = 4
    with Pool(poolsize, initializer=init_child, initargs=(lock,)) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()

Этот код выведет числа от 0 до 3 в порядке возрастания (в порядке отправки заданий), потому что он использует блокировку. Закомментируйте строку with lock:, чтобы она распечатала числа в порядке убывания.

Это решение работает как в Windows, так и в Unix. Однако, поскольку процессы могут разветвляться в системах unix, unix нужно только объявить глобальные переменные в области модуля. Дочерний процесс получает копию памяти родителя, которая включает объект блокировки, который все еще работает. Таким образом, инициализатор не является строго необходимым, но он может помочь документировать работу кода. Когда многопроцессорность может создавать процессы путем разветвления, то также работает следующее.

from multiprocessing import Pool, Lock
from time import sleep

lock = Lock()

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def main():
    poolsize = 4
    with Pool(poolsize) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()
person Dunes    schedule 25.02.2015
comment
Я изучал второй пример, но похоже, что lock = Lock() не является глобальным, поскольку не передается мастером - я ошибаюсь? - person Chameleon; 01.03.2015
comment
Если первый пример может быть той же проблемой - я его протестирую - код выглядит красиво, но я думаю, что дочерний процесс ничего не будет знать о родительском под Windows. - person Chameleon; 01.03.2015
comment
Во втором примере, когда пулом создается дочерний процесс (в Unix), вся память родительского процесса копируется в дочерний процесс (включая объект блокировки). Поскольку вы используете окна, не используйте второй пример. - person Dunes; 01.03.2015

Используйте глобальный семафор и получите его, если вы обращаетесь к ресурсу. Например:

import multiprocessing
from time import sleep

semaphore = multiprocessing.Semaphore(2)

def do_job(id):
    with semaphore:
        sleep(1)
    print("Finished job")

def main():
    pool = multiprocessing.Pool(6)
    for job_id in range(6):
        print("Starting job")
        pool.apply_async(do_job, [job_id])
    pool.close()
    pool.join()

if __name__ == "__main__":
    main()

Эта программа завершает только два задания в секунду, потому что другие потоки ожидают семафора.

person MarcelSimon    schedule 04.03.2015