Python, използването на мултипроцес е по-бавно, отколкото неизползването му

След като прекарах много време в опити да обвия главата си около многопроцесорността, измислих този код, който е тест за сравнение:

Пример 1:

from multiprocessing  import Process

class Alter(Process):
    def __init__(self, word):
        Process.__init__(self)
        self.word = word
        self.word2 = ''

    def run(self):
        # Alter string + test processing speed
        for i in range(80000):
            self.word2 = self.word2 + self.word

if __name__=='__main__':
    # Send a string to be altered
    thread1 = Alter('foo')
    thread2 = Alter('bar')
    thread1.start()
    thread2.start()

    # wait for both to finish

    thread1.join()
    thread2.join()

    print(thread1.word2)
    print(thread2.word2)

Това завършва за 2 секунди (половината от времето за многопоточност). От любопитство реших да пусна следното:

Пример 2:

word2 = 'foo'
word3 = 'bar'

word = 'foo'
for i in range(80000):
    word2 = word2 + word

word  = 'bar'
for i in range(80000):
    word3 = word3 + word

print(word2)
print(word3)

За мой ужас това се случи за по-малко от половин секунда!

какво става тук Очаквах мултипроцесирането да работи по-бързо - не трябва ли да завърши наполовина времето на Пример 2, като се има предвид, че Пример 1 е Пример 2, разделен на два процеса?

Актуализация:

След като разгледах отзивите на Крис, включих „действителния“ код, който отнема най-много време за обработка, и ме накара да обмисля многопроцесорна обработка:

self.ListVar = [[13379+ strings],[13379+ strings],
                [13379+ strings],[13379+ strings]]

for b in range(len(self.ListVar)):
    self.list1 = []
    self.temp = []
    for n in range(len(self.ListVar[b])):
        if not self.ListVar[b][n] in self.temp:
            self.list1.insert(n, self.ListVar[b][n] + '(' + 
                              str(self.ListVar[b].count(self.ListVar[b][n])) +
                              ')')
           self.temp.insert(0, self.ListVar[b][n])

   self.ListVar[b] = list(self.list1)

person Rhys    schedule 08.01.2012    source източник


Отговори (4)


ETA: След като публикувахте своя код, мога да ви кажа, че има прост начин да правите това, което правите МНОГО по-бързо (>100 пъти по-бързо).

Виждам, че това, което правите, е да добавите честота в скоби към всеки елемент в списък с низове. Вместо да броите всички елементи всеки път (което, както можете да потвърдите с помощта на cProfile, е най-голямото тясно място във вашия код), можете просто да създадете речник, който съпоставя всеки елемент с неговата честота. По този начин трябва само да преминете през списъка два пъти - веднъж, за да създадете честотния речник, веднъж, за да го използвате, за да добавите честота.

Тук ще покажа новия си метод, ще определя времето и ще го сравня със стария метод, като използвам генериран тестов случай. Тестовият случай дори показва, че новият резултат е напълно идентичен със стария. Забележка: Всичко, на което наистина трябва да обърнете внимание по-долу, е new_method.

import random
import time
import collections
import cProfile

LIST_LEN = 14000

def timefunc(f):
    t = time.time()
    f()
    return time.time() - t


def random_string(length=3):
    """Return a random string of given length"""
    return "".join([chr(random.randint(65, 90)) for i in range(length)])


class Profiler:
    def __init__(self):
        self.original = [[random_string() for i in range(LIST_LEN)]
                            for j in range(4)]

    def old_method(self):
        self.ListVar = self.original[:]
        for b in range(len(self.ListVar)):
            self.list1 = []
            self.temp = []
            for n in range(len(self.ListVar[b])):
                if not self.ListVar[b][n] in self.temp:
                    self.list1.insert(n, self.ListVar[b][n] + '(' +    str(self.ListVar[b].count(self.ListVar[b][n])) + ')')
                    self.temp.insert(0, self.ListVar[b][n])

            self.ListVar[b] = list(self.list1)
        return self.ListVar

    def new_method(self):
        self.ListVar = self.original[:]
        for i, inner_lst in enumerate(self.ListVar):
            freq_dict = collections.defaultdict(int)
            # create frequency dictionary
            for e in inner_lst:
                freq_dict[e] += 1
            temp = set()
            ret = []
            for e in inner_lst:
                if e not in temp:
                    ret.append(e + '(' + str(freq_dict[e]) + ')')
                    temp.add(e)
            self.ListVar[i] = ret
        return self.ListVar

    def time_and_confirm(self):
        """
        Time the old and new methods, and confirm they return the same value
        """
        time_a = time.time()
        l1 = self.old_method()
        time_b = time.time()
        l2 = self.new_method()
        time_c = time.time()

        # confirm that the two are the same
        assert l1 == l2, "The old and new methods don't return the same value"

        return time_b - time_a, time_c - time_b

p = Profiler()
print p.time_and_confirm()

Когато стартирам това, получава времена от (15.963812112808228, 0.05961179733276367), което означава, че е около 250 пъти по-бързо, въпреки че това предимство зависи както от това колко дълги са списъците, така и от разпределението на честотата във всеки списък. Сигурен съм, че ще се съгласите, че с това предимство в скоростта вероятно няма да е необходимо да използвате мултипроцесор :)

(Оригиналния ми отговор е оставен по-долу за бъдещи поколения)

ETA: Между другото, струва си да се отбележи, че този алгоритъм е приблизително линеен по отношение на дължината на списъците, докато кодът, който сте използвали, е квадратичен. Това означава, че се представя с още по-голямо предимство, колкото по-голям е броят на елементите. Например, ако увеличите дължината на всеки списък до 1000000, изпълнението му отнема само 5 секунди. Въз основа на екстраполация, старият код ще отнеме повече от ден :)


Зависи от операцията, която извършвате. Например:

import time
NUM_RANGE = 100000000

from multiprocessing  import Process

def timefunc(f):
    t = time.time()
    f()
    return time.time() - t

def multi():
    class MultiProcess(Process):
        def __init__(self):
            Process.__init__(self)

        def run(self):
            # Alter string + test processing speed
            for i in xrange(NUM_RANGE):
                a = 20 * 20

    thread1 = MultiProcess()
    thread2 = MultiProcess()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

def single():
    for i in xrange(NUM_RANGE):
        a = 20 * 20

    for i in xrange(NUM_RANGE):
        a = 20 * 20

print timefunc(multi) / timefunc(single)

На моята машина многопроцесорната операция отнема само ~60% от времето на еднопоточната.

person David Robinson    schedule 08.01.2012
comment
Хей Дейвид, благодаря много за страхотния код. Ще приема този отговор. Едно нещо обаче. може би не бях достатъчно ясен във въпроса. броят на низовете в скоби трябва да брои само тези низове във всеки списък. например. [['betty', 'harry', 'sam', 'sam'], ['gary', 'larry', 'fed', 'sam'] ...] --- трябва да се върне --- [[ 'betty(1)', 'harry(1)', 'sam(2)', 'sam(2)'], ['gary(1)', 'larry(1)', 'fed(1)' , 'sam(1)'] ...]. В момента, когато pdb.set_trace() и извиквам да отпечатам, например ListVar[0] и намирам запис с „(2)“ или „(3)“ и търся съответния низ вътре в ListVar[0] ... това не е друго - person Rhys; 08.01.2012
comment
Както в моя код, така и във вашия, той брои само низовете във всеки списък (не в общия, вложен списък). Забележете, че честотният речник се пресъздава за всеки inner_lst. Освен това показвате sam(2) като се появява два пъти във вашия пример тук, но начинът, по който сте написали кода, където той проверява временния масив за такива, които вече съществуват, ще се появи само веднъж: [['betty(1)' , 'harry(1)', 'sam(2)'], ['gary(1)', 'larry(1)', 'fed(1)', 'sam(1)']]. И моят, и вашият метод връщат точно това. - person David Robinson; 08.01.2012
comment
добре, благодаря, прав си за временното, трябва да има само 1 sam(2), ще проверя отново дали брои правилно, когато се прибера вкъщи - person Rhys; 08.01.2012
comment
да, прав си. всички дубликати се изтриват, така че няма да намеря такъв, който е точно това, което ми трябва. Благодаря много - person Rhys; 10.01.2012

Този пример е твърде малък, за да се възползва от мултипроцесирането.

Има МНОГО режийни разходи при стартиране на нов процес. Ако имаше тежка обработка, това би било пренебрежимо малко. Но вашият пример наистина не е толкова интензивен, така че трябва да забележите излишните разходи.

Вероятно ще забележите по-голяма разлика с реалните нишки, жалко, че Python (е, CPython) има проблеми с обвързаната с процесора нишка.

person Chris Eberle    schedule 08.01.2012
comment
какво смятате за „тежка обработка“. Увеличих диапазона до 100 000 и за двата примера. Пример1 завършва за 17 секунди! Пример2 завършва след 0 секунди. Опитах се да отида по-високо в range(), но Example1 буквално не се върна след 10 минути - person Rhys; 08.01.2012
comment
@Rhys, добре за едно нещо, имаш си пример, който просто изяжда и изяжда паметта, което непременно ще причини проблеми. Истинският обвързан с процесора код за обработка би бил като, не знам, матрично разлагане или нещо подобно. - person Chris Eberle; 08.01.2012
comment
Тествам това за следното приложение. Да поиска списък от низове (списък от 17 000 низа), ако (всеки) има дублиращи се записи. и ако е така, за да добавя този запис в низ с броя на дубликатите в скоби ... трябва ли да използвам мултипроцесор за това? - person Rhys; 08.01.2012
comment
Рис: може би трябва да публикувате фрагмент от действителния си код? Възможно е да има други оптимизации на ефективността, които можем да предложим. - person David Robinson; 08.01.2012
comment
Готово, основният източник на забавяне също е добавен, моят основен въпрос при актуализация - person Rhys; 08.01.2012
comment
това, което не разбирам, е, че казвате, че има „МНОГО режийни разходи при стартиране на нов процес“. Как това обяснява факта, че мултипроцесът се забавя СТЕПЕННО, докато увеличавам диапазона(). Съдейки по това, което казвате, трябва да има еднократна цена на времето за подпроцес, а не динамична - person Rhys; 08.01.2012
comment
@Rhys ми позволи да ви дам най-важния съвет, който някога ми е даван: когато става въпрос за оптимизация, измервайте, измервайте и отново измервайте. Освен ако не започнете да пускате програми за профилиране, за да видите точно къде се случва тясното място, всичко това са спекулации. Процесите имат по-високи разходи от нишките. Това е факт. Въпреки това не мога да кажа със 100% увереност как това ще се отрази на вашия конкретен код. - person Chris Eberle; 08.01.2012
comment
Съгласен съм. Като странична мисъл, ако работата в основната нишка е най-бързият резултат, който мога да получа. Тогава изглежда логично, че ако трябваше да създам два subprocess.popen към два двоични файла и да препратя половината изчисления към всеки. Ще има еднократна цена за отваряне на всеки двоичен файл, но изчислението ще бъде направено в рамките на „главната нишка“ на всеки двоичен файл, като по този начин ще се намали наполовина времето на процеса за извършване в една основна нишка. Ще опитам това и ще докладвам. Може би спекулациите ще доведат до благоприятен резултат - person Rhys; 08.01.2012
comment
Вероятно ще забележите по-голяма разлика с истинските нишки. Защо? Първоначалната цена за завъртане на процесите може да бъде много по-висока от нишките на някои платформи (главно Windows), а разходите за изпращане на данни към процесите ще бъдат по-високи на повечето платформи, но доколкото действително вършат работата, те ще работят поне толкова бързо, със същото количество режийни разходи, колкото нишките. Така че нишките са по-бързи само когато задачите са твърде малки, за да бъдат паралелизирани на първо място (както при въпроса на OP). - person abarnert; 11.08.2014
comment
@abarnert ти удари ноктите на главата. Тук има значение наказанието при стартиране, а не суровото време за обработка. Като се има предвид колко малък е примерът, много повече време ще бъде изразходвано за инициализиране и спиране на процесите, отколкото за действително извършване на изчисленията. Би било по-бързо просто да направите изчислението по сериен начин. В такъв случай. - person Chris Eberle; 14.08.2014
comment
@Крис: Съгласен; мисълта ми беше, че когато режийните разходи са твърде високи за дъщерните процеси, почти винаги са твърде високи и за нишките; вашият отговор, поне както го чета, предполага, че нишките биха били полезни за случаи като този, ако не и за GIL. - person abarnert; 14.08.2014

Мултипроцесирането може да бъде полезно за това, което правите, но не и по начина, по който мислите да го използвате. Тъй като по същество правите някакво изчисление на всеки член на списък, можете да го направите с помощта на метода multiprocessing.Pool.map, за да извършите изчислението на членовете на списъка паралелно.

Ето един пример, който показва производителността на вашия код с помощта на един процес и с помощта на multiprocessing.Pool.map:

from multiprocessing import Pool
from random import choice
from string import printable
from time import time

def build_test_list():
    # Builds a test list consisting of 5 sublists of 10000 strings each.
    # each string is 20 characters long
    testlist = [[], [], [], [], []]
    for sublist in testlist:
        for _ in xrange(10000):
            sublist.append(''.join(choice(printable) for _ in xrange(20)))
    return testlist

def process_list(l):
    # the time-consuming code
    result = []
    tmp = []
    for n in range(len(l)):
        if l[n] not in tmp:
            result.insert(n, l[n]+' ('+str(l.count(l[n]))+')')
            tmp.insert(0, l[n])
    return result

def single(l):
    # process the test list elements using a single process
    results = []
    for sublist in l:
        results.append(process_list(sublist))
    return results

def multi(l):
    # process the test list elements in parallel
    pool = Pool()
    results = pool.map(process_list, l)
    return results

print "Building the test list..."
testlist = build_test_list()

print "Processing the test list using a single process..."
starttime = time()
singleresults = single(testlist)
singletime = time() - starttime

print "Processing the test list using multiple processes..."
starttime = time()
multiresults = multi(testlist)
multitime = time() - starttime

# make sure they both return the same thing
assert singleresults == multiresults

print "Single process: {0:.2f}sec".format(singletime)
print "Multiple processes: {0:.2f}sec".format(multitime)

Изход:

Building the test list...
Processing the test list using a single process...
Processing the test list using multiple processes...
Single process: 34.73sec
Multiple processes: 24.97sec
person mdeous    schedule 08.01.2012
comment
И аз не можах да реша на кого да дам точките :S Отговорите на теб и на Дейвид са много добри. Мислех да му дам точките, защото той има по-малко, но съм сигурен, че ще използвам този код в бъдеще. Благодаря, не научих много - person Rhys; 08.01.2012
comment
@Rhys няма проблем ;) щом това е било полезно, радвам се. (можете да давате точки на няколко отговора, но можете да изберете само един като отговор) - person mdeous; 08.01.2012
comment
Това е страхотна идея като цяло и нещо, на което всеки начинаещ в multiprocessing трябва да се запознае възможно най-скоро… но всъщност не решава проблема му. Той вече беше свършил половината работа по всеки процес; освен ако задачите не са изключително променливи по отношение на необходимото време (което не е нито във вашия, нито в неговия пример), добавянето на пул просто добавя малко допълнителни разходи. Все още може да си струва да се направи за четливост и организация, но не и за производителност. (Отново, това не е вярно за всички проблеми, а само за такива.) - person abarnert; 14.08.2014

Тази тема беше много полезна!

Само едно кратко наблюдение върху добрия втори код, предоставен от Дейвид Робинсън по-горе (отговорено на 8 януари '12 в 5:34), който беше кодът, по-подходящ за настоящите ми нужди .

В моя случай имах предишни записи за времето на изпълнение на целева функция без многопроцесорна обработка. Когато използва своя код за внедряване на мултипроцесорна функция, неговият timefunc(multi) не отразява действителното време на multi, а изглежда по-скоро отразява времето, изразходвано в родителя.

Това, което направих, беше да екстернирам функцията за синхронизация и времето, което получих, изглеждаше по-скоро като очакваното:

 start = timefunc()
 multi()/single()
 elapsed = (timefunc()-start)/(--number of workers--)
 print(elapsed)

В моя случай с двойно ядро, общото време, извършено от 'x' работници, използващи целевата функция, беше два пъти по-бързо от изпълнението на обикновен for-цикъл върху целевата функция с 'x' итерации.

Нов съм в многопроцесорната обработка, така че, моля, бъдете внимателни с това наблюдение.

person user3675901    schedule 27.05.2014
comment
Наистина не трябва да измервате времето по този начин; вижте библиотеката timeit за правилния начин за измерване на времето на стенен часовник, взето от вашия код, но накратко: elapsed = timeit.timeit(multi, number=100, repeat=3) ще се погрижи да използвате правилната часовникова функция, ще се погрижи за неща, за които не сте се сетили, като деактивиране на детектора на GC цикъл, ще изпълни кода ви 100 пъти, ще повтори теста 3 пъти и ще вземе най-ниската стойност така че можете да сте сигурни, че няма външни фактори, които да пречат на времето и т.н. - person abarnert; 14.08.2014