Как распараллелить вычисление суммы в Python numpy?

У меня есть сумма, которую я пытаюсь вычислить, и мне трудно распараллелить код. Расчет, который я пытаюсь распараллелить, довольно сложен (он использует как массивы numpy, так и scipy разреженные матрицы). Он выдает массивный массив, и я хочу просуммировать выходные массивы из примерно 1000 вычислений. В идеале я бы сохранил текущую сумму по всем итерациям. Однако я не смог понять, как это сделать.

До сих пор я пробовал использовать функцию Parallel joblib и функцию pool.map с пакетом multiprocessing python. Для обоих из них я использую внутреннюю функцию, которая возвращает массив numpy. Эти функции возвращают список, который я конвертирую в массив, а затем суммирую.

Однако после того, как функция joblib Parallel завершит все итерации, основная программа никогда не продолжит выполнение (похоже, что исходное задание находится в приостановленном состоянии, используя 0% ЦП). Когда я использую pool.map, я получаю ошибки памяти после завершения всех итераций.

Есть ли способ просто распараллелить текущую сумму массивов?

Изменить. Цель состоит в том, чтобы сделать что-то вроде следующего, кроме параллельного.

def summers(num_iters):

    sumArr = np.zeros((1,512*512)) #initialize sum
    for index in range(num_iters):
        sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array

    return sumArr

person Kevin    schedule 30.01.2012    source источник
comment
вам следует попытаться опубликовать минимальный пример кода. Вы пытаетесь выполнить свертку?   -  person Simon Bergot    schedule 30.01.2012
comment
Нет, свертку не делаю. Я поворачиваю изображение примерно 1000 раз, и мне нужно просуммировать результат каждого поворота. Для pool.map я просто использую outputArr = np.array(pool.map(parloop, range(num_views))), где parloop возвращает массив numpy.   -  person Kevin    schedule 30.01.2012
comment
Может уже параллельно? поскольку numpy знает, что вы хотите сделать матричное скалярное произведение, он может использовать оптимизированную реализацию, полученную как часть BLAS (подпрограммы базовой линейной алгебры). ... многие архитектуры теперь имеют BLAS, который также использует преимущества многоядерной машины. Если ваш numpy / scipy скомпилирован с использованием одного из них, то dot () будет вычисляться параллельно (если это быстрее) без каких-либо действий. www.scipy.org/ParallelProgramming   -  person endolith    schedule 12.07.2012
comment
Также загляните в numexpr: code.google.com/p/numexpr Вы расскажете уравнение вы хотите, чтобы он вычислялся (так же, как код Python, но записан в виде строки), и он позаботится об оптимизации и многопоточности за вас   -  person endolith    schedule 12.07.2012


Ответы (2)


Я понял, как распараллелить сумму массивов с помощью multiprocessing, apply_async и обратных вызовов, поэтому я публикую это здесь для других людей. Я использовал пример страницы для Parallel Python для класса обратного вызова Sum, хотя Я фактически не использовал этот пакет для реализации. Однако это натолкнуло меня на идею использования обратных вызовов. Вот упрощенный код того, что я в итоге использовал, и он делает то, что я хотел.

import multiprocessing
import numpy as np
import thread

class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)
    def __init__(self):
        self.value = np.zeros((1,512*512)) #this is the initialization of the sum
        self.lock = thread.allocate_lock()
        self.count = 0

    def add(self,value):
        self.count += 1
        self.lock.acquire() #lock so sum is correct if two processes return at same time
        self.value += value #the actual summation
        self.lock.release()

def computation(index):
    array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
    return array1

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    sumArr = Sum() #create an instance of callback class and zero the sum
    for index in range(num_iters):
        singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)

    pool.close()
    pool.join() #waits for all the processes to finish

    return sumArr.value

Я также смог получить эту работу с помощью распараллеленной карты, что было предложено в другом ответе. Я пробовал это раньше, но неправильно реализовал. Оба способа работают, и я думаю, что этот ответ довольно хорошо объясняет, какой метод использовать (map или apply.async). Для версии карты вам не нужно определять класс Sum, и функция summers станет

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these
    sumArr = np.zeros((1,512*512))              #but I do to make sure I have the memory

    outputArr = np.array(pool.map(computation, range(num_iters)))
    sumArr = outputArr.sum(0)

    pool.close() #not sure if this is still needed since map waits for all iterations

    return sumArr
person Kevin    schedule 01.02.2012

Не уверен, что понимаю проблему. Вы просто пытаетесь разделить список на пул рабочих, заставить их вести текущую сумму своих вычислений и суммировать результат?

#!/bin/env python
import sys
import random
import time
import multiprocessing
import numpy as np

numpows = 5
numitems = 25
nprocs = 4

def expensiveComputation( i ):
  time.sleep( random.random() * 10 )
  return np.array([i**j for j in range(numpows)])

def listsum( l ):
  sum = np.zeros_like(l[0])
  for item in l:
    sum = sum + item
  return sum

def partition(lst, n):
  division = len(lst) / float(n)
  return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ]

def myRunningSum( l ):
  sum = np.zeros(numpows)
  for item in l:
     sum = sum + expensiveComputation(item)
  return sum

if __name__ == '__main__':

  random.seed(1)
  data = range(numitems)

  pool = multiprocessing.Pool(processes=4,)
  calculations = pool.map(myRunningSum, partition(data,nprocs))

  print 'Answer is:', listsum(calculations)
  print 'Expected answer: ', np.array([25.,300.,4900.,90000.,1763020.])

(функция разделения, поступающая из Python: Разбиение списка на n разделов почти одинаковой длины)

person Jonathan Dursi    schedule 31.01.2012
comment
Спасибо за ваш ответ, но я хотел сохранить текущую сумму за несколько итераций без суммирования по каким-либо спискам в конце. Я поворачиваю изображение примерно 1000 раз, и мне нужно просуммировать результат вычислений для каждого поворота. Думаю, я понял, как это сделать, и опубликовал это в качестве ответа. - person Kevin; 01.02.2012
comment
Неужели один массив, возвращаемый на процессор, действительно так важен? Так вы получите способ меньше накладных расходов. - person Jonathan Dursi; 01.02.2012