Разширяване на python Queue.PriorityQueue (работен приоритет, типове работни пакети)

Бих искал да разширя Queue.PriorityQueue, описан тук: http://docs.python.org/library/queue.html#Queue.PriorityQueue

Опашката ще съдържа работни пакети с приоритет. Работниците ще получат работни пакети и ще ги обработват. Искам да направя следните допълнения:

  1. Работниците също имат приоритет. Когато няколко работници са неактивни, този с най-висок приоритет трябва да обработи входящ работен пакет.

  2. Не всеки работник може да обработва всеки работен пакет, така че е необходим механизъм, който проверява дали типът на работния пакет и възможностите на работника съвпадат.

Търся съвети как това е най-добре приложено (започване от нулата, разширяване на PrioriyQueue или Queue, ...).

редактиране

Ето го първият ми (непроверен) опит. Основната идея е, че всички чакащи нишки ще бъдат уведомени. След това всички се опитват да получат работен елемент чрез _choose_worker(self, worker). (Направи го уики на общността)

редактиране

Сега работи за някои прости тестове...

редактиране Добавено е персонализирано BaseManager и локално копие на работния списък във функцията _choose_worker.

редактиране корекция на грешка

import Queue
from Queue import Empty, Full
from time import time as _time
import heapq

class AdvancedQueue(Queue.PriorityQueue):

    # Initialize the queue representation
    def _init(self, _maxsize):
        self.queue = []
        self.worker = []

    def put(self, item, block=True, timeout=None):
        '''
        Put an item into the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
        '''
        self.not_full.acquire()
        try:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() == self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() == self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                else:
                    endtime = _time() + timeout
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notifyAll()  # only change
        finally:
            self.not_full.release()

    def get(self, worker, block=True, timeout=None):
        self.not_empty.acquire()
        try:
            self._put_worker(worker)

            if not block:
                if not self._qsize():
                    raise Empty
                else:
                    return self._choose_worker(worker)
            elif timeout is None:
                while True:
                    while not self._qsize():
                        self.not_empty.wait()
                    try:
                        return self._choose_worker(worker)
                    except Empty:
                        self.not_empty.wait()

            elif timeout < 0:
                raise ValueError("'timeout' must be a positive number")
            else:
                endtime = _time() + timeout
                def wait(endtime):
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)

                while True:
                    while not self._qsize():
                        wait(endtime)

                    try:
                        return self._choose_worker(worker)
                    except Empty:
                        wait(endtime)
        finally:
            self._remove_worker(worker)
            self.not_empty.release()

    # Put a new worker in the worker queue
    def _put_worker(self, worker, heappush=heapq.heappush):
        heappush(self.worker, worker)

    # Remove a worker from the worker queue
    def _remove_worker(self, worker):
        self.worker.remove(worker)

    # Choose a matching worker with highest priority
    def _choose_worker(self, worker):
        worker_copy = self.worker[:]    # we need a copy so we can remove assigned worker
        for item in self.queue:
            for enqueued_worker in worker_copy:
                if item[1].type in enqueued_worker[1].capabilities:
                    if enqueued_worker == worker:
                        self.queue.remove(item)
                        self.not_full.notify()
                        return item
                    else:
                        worker_copy.remove(enqueued_worker)
                        # item will be taken by enqueued_worker (which has higher priority),
                        # so enqueued_worker is busy and can be removed
                        continue
        raise Empty

person Community    schedule 03.10.2010    source източник
comment
+1 интересен въпрос. Имам идея, но бих искал първо да видя други отговори. Засега само ще ви дам малък намек: внимавайте за случая, когато има свободна работа и двама свободни работници, но работникът с най-висок приоритет не може да се справи със задачата в опашката. Внимавайте да не изпаднете в задънена улица. По подобен начин има случай, когато имате две задачи и един работник и работникът не може да се справи с работата с най-висок приоритет, отново внимавайте за блокиране. Вероятно трябва да тествате тези случаи (и много повече тестове за други по-често срещани сценарии - празна опашка и т.н.).   -  person Mark Byers    schedule 03.10.2010
comment
Страхотно за мен, когато започвам с тестове за модули на python :)   -  person tauran    schedule 03.10.2010


Отговори (2)


Мисля, че описвате ситуация, в която имате две "приоритетни опашки" - една за работните места и една за работниците. Наивният подход е да вземете работата с най-висок приоритет и работника с най-висок приоритет и да се опитате да ги сдвоите. Но, разбира се, това се проваля, когато работникът не е в състояние да изпълни работата.

За да поправя това, бих предложил първо да вземете задачата с най-висок приоритет и след това да повторите всички работници в низходящ приоритет, докато намерите такъв, който може да обработи тази задача. Ако никой от работниците не може да обработи заданието, тогава поемете второто по приоритет задание и т.н. Така че ефективно имате вложени цикли, нещо подобно:

def getNextWorkerAndJobPair():
    for job in sorted(jobs, key=priority, reverse=True):
        for worker in sorted(workers, key=priority, reverse=True):
             if worker.can_process(job):
                 return (worker, job)

Горният пример обаче сортира данните ненужно много пъти. За да избегнете това, би било най-добре да съхранявате данните вече в сортиран ред. Що се отнася до това какви структури от данни да използвам, не съм много сигурен кое е най-доброто. В идеалния случай бихте искали O(log n) вмъквания и премахвания и да можете да обхождате колекцията в сортиран ред за O(n) време. Мисля, че PriorityQueue отговаря на първото от тези изисквания, но не и на второто. Предполагам, че сортираният списък от пакета blist ще работи, но не съм го пробвал сам и уеб страницата не е конкретна относно гаранциите за ефективност, които този клас предлага.

Начинът, който предложих да се повторят първо заданията и след това работниците във вътрешния цикъл, не е единственият подход, който можете да предприемете. Можете също така да обърнете реда на циклите, така че първо да изберете работника с най-висок приоритет и след това да опитате да намерите работа за него. Или можете да намерите валидната двойка (работа, работник), която има максималната стойност на f(priority_job, priority_worker) за някаква функция f (например просто добавете приоритетите).

person Mark Byers    schedule 03.10.2010
comment
За моята редакция следвах вашия подход. Първо ще се опитам да го накарам да работи, а след това ще помисля дали да го променя. Също така искам да използвам повторно възможно най-много код от PriorityQueue. - person tauran; 04.10.2010

Единственият отговор беше полезен, но не достатъчно подробен, така че засега ще приема собствения си отговор. Вижте кода във въпроса.

person Community    schedule 12.10.2010