Совместно использовать словарь, хранящий объекты, между несколькими процессами в Python

Я работаю над большим скриптом, основная цель которого - прочитать содержимое многих файлов и сохранить номер каждого элемента в словаре. Если элемент отсутствует в словаре, то мы создаем новый экземпляр какого-то объекта и затем увеличиваем его, иначе только увеличиваем. Поскольку каждый из обрабатываемых файлов сам по себе огромен, и иногда мне нужно обработать более 100 из них, я хотел немного ускорить процесс и воспользоваться преимуществами многопроцессорного модуля Python. Вот сильно упрощённая версия скрипта (путь я скрыл с помощью ..., он не настоящий):

import multiprocessing as mp
from os import listdir
from os.path import join

manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()

class TestClass:
    def __init__(self):
        self._number = 0

    def increment(self):
        self._number += 1

def worker(file):
    f = open(file, 'r')
    for line in f.readlines():
        if line not in dictionary:
            dictionary[line] = TestClass()

        dictionary[line].increment()

def _list_files():
    for f in listdir("..."):
        queue.put(join("...", f))

def pool():
    _list_files()
    _pool = mp.Pool(mp.cpu_count())    

    for i in range(len(queue)):
        _pool.apply(worker, args=(queue.get()))

    _pool.close()
    _pool.join()

pool()
print(dictionary)

Проблема в том, что скрипт вылетает с сообщением:

AttributeError: Can't get attribute 'TestClass' on <module '__main__' from '.../multiprocessing_test.py'>  

Есть ли способ заставить это работать?
Я не тот, кто создал начальную версию скрипта, я просто добавляю в него некоторые функции. При этом структура скрипта должна остаться прежней, т.к. переписывание заняло бы слишком много времени, то есть TestClass, worker и list_files не могут изменить свою структуру (кроме всего, что связано с многопроцессорностью)


person Colonder    schedule 24.11.2017    source источник


Ответы (1)


(Кажется, вы задавали этот вопрос раньше.)

Код вашего примера нефункционален по целому ряду причин, не в последнюю очередь из-за того, что ... просто не делает ничего полезного:

$ python tst.py
Traceback (most recent call last):
  File "tst.py", line 38, in <module>
    pool()
  File "tst.py", line 29, in pool
    _list_files()
  File "tst.py", line 25, in _list_files
    for f in listdir("..."):
OSError: [Errno 2] No such file or directory: '...'

(Публиковать код, который не будет работать, нехорошо, но рекомендуется предоставить MCVE.) Итак, я исправлено, что:

index 39014ff..1ac9f4a 100644
--- a/tst.py
+++ b/tst.py
@@ -2,6 +2,8 @@ import multiprocessing as mp
 from os import listdir
 from os.path import join

+DIRPATH = 'inputs'
+
 manager = mp.Manager()
 queue = manager.Queue()
 dictionary = manager.dict()
@@ -22,8 +24,8 @@ def worker(file):
         dictionary[line].increment()

 def _list_files():
-    for f in listdir("..."):
-        queue.put(join("...", f))
+    for f in listdir(DIRPATH):
+        queue.put(join(DIRPATH, f))

 def pool():
     _list_files()

и создал каталог inputs/ с одним образцом входного файла:

$ ls inputs
one
$ cat inputs/one
1
one
unum

и теперь этот пример производит:

$ python tst.py
Traceback (most recent call last):
  File "tst.py", line 40, in <module>
    pool()
  File "tst.py", line 34, in pool
    for i in range(len(queue)):
TypeError: object of type 'AutoProxy[Queue]' has no len()

Я не буду утверждать, что эта переделка хороша, но я пошла дальше и переписала ее так, чтобы она действительно работала:

import multiprocessing as mp
from os import listdir
from os.path import join

DIRPATH = 'inputs'

class TestClass:
    def __repr__(self):
        return str(self._number)

    def __init__(self):
        self._number = 0

    def increment(self):
        self._number += 1

def worker(dictionary, queue):
    while True:
        path = queue.get()
        if path is None:
            return
        f = open(path, 'r')
        for line in f.readlines():
            if line not in dictionary:
                dictionary[line] = TestClass()
            dictionary[line].increment()

def run_pool():
    manager = mp.Manager()
    queue = manager.Queue()
    dictionary = manager.dict()
    nworkers = mp.cpu_count()
    pool = mp.Pool(nworkers)

    for i in range(nworkers):
        pool.apply_async(worker, args=(dictionary, queue))

    for f in listdir(DIRPATH):
        queue.put(join(DIRPATH, f))
    for i in range(nworkers):
        queue.put(None)

    pool.close()
    pool.join()

    return dictionary

def main():
    dictionary = run_pool()
    print(dictionary)

if __name__ == '__main__':
    main()

Основные отличия:

  • Я удалил все глобальные переменные. Экземпляр диспетчера, управляемая очередь и управляемый словарь являются локальными для run_pool.

  • Я помещаю имена файлов в очередь после создания nworker рабочих процессов. Каждый рабочий выполняет цикл, читая имена файлов, пока не прочитает имя None, а затем возвращает свой результат (Нет).

  • В основном цикле имена файлов помещаются в очередь, чтобы рабочие могли извлекать имена файлов из очереди по мере завершения обработки каждого предыдущего файла. Чтобы сигнализировать всем nworkers рабочим процессам о выходе, основной цикл добавляет в очередь столько же None записей.

  • run_pool возвращает окончательный (все еще управляемый) словарь.

И, конечно же, я добавил __repr__ к вашему объекту TestClass, чтобы мы могли видеть подсчеты. Я также убедился, что код должен работать в Windows, переместив драйвер main в функцию, вызываемую только в том случае, если __name__ == '__main__'.

person torek    schedule 25.11.2017