Как да паралелизирам изчисление на сума в python numpy?

Имам сума, която се опитвам да изчисля, и имам затруднения при паралелизиране на кода. Изчислението, което се опитвам да паралелизирам, е доста сложно (използва както numpy масиви, така и scipy разредени матрици). Извежда масив numpy и искам да сумирам изходните масиви от около 1000 изчисления. В идеалния случай бих запазил текуща сума за всички повторения. Не успях обаче да разбера как да направя това.

Досега се опитах да използвам паралелната функция на joblib и функцията pool.map с многопроцесорния пакет на python. И за двете използвам вътрешна функция, която връща масив numpy. Тези функции връщат списък, който преобразувам в масив numpy и след това сумирам.

Въпреки това, след като паралелната функция на joblib завърши всички итерации, основната програма никога не продължава да работи (изглежда, че оригиналната задача е в суспендирано състояние, използвайки 0% CPU). Когато използвам pool.map, получавам грешки в паметта, след като всички итерации са завършени.

Има ли начин просто да паралелизирате текуща сума от масиви?

Редактиране: Целта е да направите нещо като следното, освен паралелно.

def summers(num_iters):

    sumArr = np.zeros((1,512*512)) #initialize sum
    for index in range(num_iters):
        sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array

    return sumArr

person Kevin    schedule 30.01.2012    source източник
comment
трябва да опитате да публикувате минимален примерен код. Опитвате ли се да извършите намотка?   -  person Simon Bergot    schedule 30.01.2012
comment
Не, не правя намотка. Завъртам изображение около 1000 пъти и трябва да сумирам резултата от всяко завъртане. За pool.map просто използвам outputArr = np.array(pool.map(parloop, range(num_views))), където parloop връща масив numpy.   -  person Kevin    schedule 30.01.2012
comment
Може би вече е паралелно? тъй като numpy знае, че искате да направите матричен точков продукт, той може да използва оптимизирана реализация, получена като част от BLAS (основните подпрограми за линейна алгебра). ... много архитектури вече имат BLAS, който също се възползва от многоядрена машина. Ако вашият numpy/scipy е компилиран с помощта на един от тях, тогава dot() ще бъде изчислен паралелно (ако това е по-бързо) без да правите нищо. www.scipy.org/ParallelProgramming   -  person endolith    schedule 12.07.2012
comment
Вижте също numexpr: code.google.com/p/numexpr Казвате му уравнението искате да изчислява (същото като кода на python, но написан като низ) и той се грижи за оптимизацията и многонишковостта вместо вас   -  person endolith    schedule 12.07.2012


Отговори (2)


Разбрах как да паралелизирам сума от масиви с мултипроцесиране, apply_async и обратни извиквания, така че публикувам това тук за други хора. Използвах примерната страница за Parallel Python за класа за обратно извикване Sum, въпреки че Всъщност не използвах този пакет за внедряване. Все пак ми даде идеята да използвам обратни извиквания. Ето опростения код за това, което накрая използвах, и той прави това, което исках да прави.

import multiprocessing
import numpy as np
import thread

class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)
    def __init__(self):
        self.value = np.zeros((1,512*512)) #this is the initialization of the sum
        self.lock = thread.allocate_lock()
        self.count = 0

    def add(self,value):
        self.count += 1
        self.lock.acquire() #lock so sum is correct if two processes return at same time
        self.value += value #the actual summation
        self.lock.release()

def computation(index):
    array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
    return array1

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    sumArr = Sum() #create an instance of callback class and zero the sum
    for index in range(num_iters):
        singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)

    pool.close()
    pool.join() #waits for all the processes to finish

    return sumArr.value

Също така успях да накарам това да работи с помощта на паралелна карта, която беше предложена в друг отговор. Бях пробвал това по-рано, но не го прилагах правилно. И двата начина работят и мисля, че този отговор обяснява проблема кой метод да се използва (map или apply.async) доста добре. За версията на картата не е необходимо да дефинирате класа Sum и става функцията summers

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these
    sumArr = np.zeros((1,512*512))              #but I do to make sure I have the memory

    outputArr = np.array(pool.map(computation, range(num_iters)))
    sumArr = outputArr.sum(0)

    pool.close() #not sure if this is still needed since map waits for all iterations

    return sumArr
person Kevin    schedule 01.02.2012

Не съм сигурен, че разбирам проблема. Опитвате ли се просто да разделите списък на набор от работници, да ги накарате да поддържат текуща сума от своите изчисления и да сумират резултата?

#!/bin/env python
import sys
import random
import time
import multiprocessing
import numpy as np

numpows = 5
numitems = 25
nprocs = 4

def expensiveComputation( i ):
  time.sleep( random.random() * 10 )
  return np.array([i**j for j in range(numpows)])

def listsum( l ):
  sum = np.zeros_like(l[0])
  for item in l:
    sum = sum + item
  return sum

def partition(lst, n):
  division = len(lst) / float(n)
  return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ]

def myRunningSum( l ):
  sum = np.zeros(numpows)
  for item in l:
     sum = sum + expensiveComputation(item)
  return sum

if __name__ == '__main__':

  random.seed(1)
  data = range(numitems)

  pool = multiprocessing.Pool(processes=4,)
  calculations = pool.map(myRunningSum, partition(data,nprocs))

  print 'Answer is:', listsum(calculations)
  print 'Expected answer: ', np.array([25.,300.,4900.,90000.,1763020.])

(функцията за разделяне идва от Python: Нарязване на списък на n дяла с почти еднаква дължина )

person Jonathan Dursi    schedule 31.01.2012
comment
Благодаря за отговора ви, но исках да запазя текуща сума за множество итерации, без да сумирам списъци в края. Завъртам изображение около 1000 пъти и трябва да сумирам резултата от изчисление от всяко завъртане. Мисля, че разбрах как да го направя и го публикувах като отговор. - person Kevin; 01.02.2012
comment
Наистина ли един масив, върнат на процесор, е толкова голяма работа? По този начин има много по-малко режийни разходи. - person Jonathan Dursi; 01.02.2012