Python Gensim, как заставить сходство WMD работать быстрее с многопроцессорной обработкой

Я пытаюсь запустить подобие gensim WMD быстрее. Как правило, это то, что есть в документации: Пример корпуса:

    my_corpus = ["Human machine interface for lab abc computer applications",
>>>              "A survey of user opinion of computer system response time",
>>>              "The EPS user interface management system",
>>>              "System and human system engineering testing of EPS",
>>>              "Relation of user perceived response time to error measurement",
>>>              "The generation of random binary unordered trees",
>>>              "The intersection graph of paths in trees",
>>>              "Graph minors IV Widths of trees and well quasi ordering",
>>>              "Graph minors A survey"]

my_query = 'Human and artificial intelligence software programs'
my_tokenized_query =['human','artificial','intelligence','software','programs']

model = a trained word2Vec model on about 100,000 documents similar to my_corpus.
model = Word2Vec.load(word2vec_model)

from gensim import Word2Vec
from gensim.similarities import WmdSimilarity

def init_instance(my_corpus,model,num_best):
    instance = WmdSimilarity(my_corpus, model,num_best = 1)
    return instance
instance[my_tokenized_query]

наиболее подходящий документ — "Human machine interface for lab abc computer applications", и это здорово.

Однако функция instance выше занимает очень много времени. Поэтому я подумал о том, чтобы разбить корпус на N частей, а затем выполнить WMD для каждой с num_best = 1, тогда в конце часть с максимальной оценкой будет наиболее похожей.

    from multiprocessing import Process, Queue ,Manager

    def main( my_query,global_jobs,process_tmp):
        process_query = gensim.utils.simple_preprocess(my_query)

        def worker(num,process_query,return_dict):  
            instance=init_instance\
(my_corpus[num*chunk+1:num*chunk+chunk], model,1)
            x = instance[process_query][0][0]
            y = instance[process_query][0][1]
            return_dict[x] = y
        manager = Manager()
        return_dict = manager.dict()

        for num in range(num_workers):
            process_tmp = Process(target=worker, args=(num,process_query,return_dict))
            global_jobs.append(process_tmp)
            process_tmp.start()
        for proc in global_jobs:
            proc.join()

        return_dict = dict(return_dict)
        ind = max(return_dict.iteritems(), key=operator.itemgetter(1))[0]
        print corpus[ind]
        >>> "Graph minors A survey"

Проблема, с которой я столкнулся, заключается в том, что, несмотря на то, что он что-то выводит, он не дает мне хороший похожий запрос из моего корпуса, даже если он получает максимальное сходство всех частей.

Я делаю что-то неправильно?


person jxn    schedule 16.05.2017    source источник
comment
Сделайте MCVE, отсутствуя my_query, my_corpus, model, my_tokenized_query.   -  person stovfl    schedule 19.05.2017
comment
Привет @stovfl, обновлено.   -  person jxn    schedule 19.05.2017
comment
Вы пытались использовать специальный компилятор Python, такой как pypy?   -  person macdrai    schedule 22.05.2017
comment
@macdrai нет, это повлияло бы?   -  person jxn    schedule 25.05.2017


Ответы (2)


Комментарий: chunk — это статическая переменная: например. кусок = 600 ...

Если вы определяете chunk static, вам нужно вычислить num_workers.

10001 / 600 = 16,6683333333 = 17 num_workers

Обычно используйте не более process, чем cores, которые у вас есть.
Если у вас есть 17 cores, это нормально.

cores являются статическими, поэтому вам следует:

num_workers = os.cpu_count()
chunk = chunksize(my_corpus, num_workers)

  1. Не тот же результат, измененный на:

    #process_query = gensim.utils.simple_preprocess(my_query)
    process_query = my_tokenized_query
    
  2. Все результаты worker Индекс 0..n.
    Следовательно, return_dict[x] может быть перезаписан из последнего рабочего процесса с тем же индексом, имеющим более низкое значение. Индекс в return_dict НЕ совпадает с индексом в my_corpus. Изменился на:

    #return_dict[x] = y
    return_dict[ (num * chunk)+x ] = y
    
  3. При использовании +1 в вычислении размера фрагмента этот первый документ будет пропущен.
    Я не знаю, как вы вычисляете chunk, рассмотрим следующий пример:

    def chunksize(iterable, num_workers):
        c_size, extra = divmod(len(iterable), num_workers)
        if extra:
            c_size += 1
        if len(iterable) == 0:
            c_size = 0
        return c_size
    
    #Usage
    chunk = chunksize(my_corpus, num_workers)
    ...
    #my_corpus_chunk = my_corpus[num*chunk+1:num*chunk+chunk]
    my_corpus_chunk = my_corpus[num * chunk:(num+1) * chunk]
    

Результаты: 10 циклов, Tuple=(номер рабочего индекса=0, номер рабочего индекса=1)

С multiprocessing, с chunk=5:
02,09:(3, 8), 01,03:(3, 5):
Инженерно-технические испытания ЭЭС
04,06,07:( 0, 8), 05,08:(0, 5), 10:(0, 7):
Человеко-машинный интерфейс для лабораторных компьютерных приложений abc

Без multiprocessing, с chunk=5:
01:(3, 6), 02:(3, 5), 05,08,10:(3, 7), 07,09:(3, 8):
Системные и человеко-системные инженерные испытания EPS
03,04,06:(0, 5):
Человеко-машинный интерфейс для лабораторных компьютерных приложений abc

Без multiprocessing, без разделения на фрагменты:
01,02,03,04,06,07,08:(3, -1):
Инженерное тестирование систем и систем человека для EPS
05,09,10: (0, -1):
человеко-машинный интерфейс для лабораторных компьютерных приложений.

Проверено с Python: 3.4.2

person stovfl    schedule 20.05.2017
comment
chunk - это статическая переменная: например. chunk = 600 сильно бы это повлияло? - person jxn; 23.05.2017

Использование Python 2.7: я использовал многопоточность вместо многопроцессорной обработки. В ветке создания WMD-Instance я делаю что-то вроде этого:

    wmd_instances = []
    if wmd_instance_count > len(wmd_corpus):
        wmd_instance_count = len(wmd_corpus)
    chunk_size = int(len(wmd_corpus) / wmd_instance_count)
    for i in range(0, wmd_instance_count):
        if i == wmd_instance_count -1:
            wmd_instance = WmdSimilarity(wmd_corpus[i*chunk_size:], wmd_model, num_results)
        else:
            wmd_instance = WmdSimilarity(wmd_corpus[i*chunk_size:chunk_size], wmd_model, num_results)
        wmd_instances.append(wmd_instance)
    wmd_logic.setWMDInstances(wmd_instances, chunk_size)

'wmd_instance_count' — это количество потоков, используемых для поиска. Я также помню размер куска. Затем, когда я хочу что-то поискать, я запускаю "wmd_instance_count"-потоки для поиска и они возвращают найденных симов:

def perform_query_for_job_on_instance(wmd_logic, wmd_instances, query, jobID, instance):
    wmd_instance = wmd_instances[instance]
    sims = wmd_instance[query]
    wmd_logic.set_mt_thread_result(jobID, instance, sims)

wmd_logic — это экземпляр класса, который затем делает следующее:

def set_mt_thread_result(self, jobID, instance, sims):
    res = []
    #
    # We need to scale the found ids back to our complete corpus size...
    #
    for sim in sims:
        aSim = (int(sim[0] + (instance * self.chunk_size)), sim[1])
        res.append(aSim)

Я знаю, код некрасивый, но он работает. Он использует потоки wmd_instance_count для поиска результатов, я их агрегирую, а затем выбираю топ-10 или что-то в этом роде.

Надеюсь это поможет.

person Imdat Solak    schedule 24.06.2017
comment
Строка: wmd_instance = WmdSimilarity(wmd_corpus[i*chunk_size:chunk_size], wmd_model, num_results) на самом деле должна быть: wmd_instance = WmdSimilarity(wmd_corpus[i*chunk_size:(i+1) * chunk_size], wmd_model, num_results) - person Imdat Solak; 26.06.2017