Расширение python Queue.PriorityQueue (рабочий приоритет, типы рабочих пакетов)

Я хотел бы расширить Queue.PriorityQueue, описанную здесь: http://docs.python.org/library/queue.html#Queue.PriorityQueue

Очередь будет содержать рабочие пакеты с приоритетом. Рабочие получат рабочие пакеты и обработают их. Хочу сделать следующие дополнения:

  1. Рабочие тоже в приоритете. Когда несколько воркеров простаивают, тот, у кого наивысший приоритет, должен обрабатывать входящий рабочий пакет.

  2. Не каждый рабочий процесс может обработать каждый рабочий пакет, поэтому необходим механизм, который проверяет, совпадают ли тип рабочего пакета и возможности рабочего процесса.

Я ищу подсказки, как это лучше всего реализовать (начиная с нуля, расширяя PrioriyQueue или Queue,...).

изменить

Вот моя первая (непроверенная) попытка. Основная идея заключается в том, что все ожидающие потоки будут уведомлены. Затем все они пытаются получить рабочий элемент через _choose_worker(self, worker). (Сделал это вики сообщества)

изменить

Теперь работает для некоторых простых тестов...

изменить Добавлен пользовательский BaseManager и локальная копия рабочего списка в функцию _choose_worker.

изменить исправление ошибки

import Queue
from Queue import Empty, Full
from time import time as _time
import heapq

class AdvancedQueue(Queue.PriorityQueue):

    # Initialize the queue representation
    def _init(self, _maxsize):
        self.queue = []
        self.worker = []

    def put(self, item, block=True, timeout=None):
        '''
        Put an item into the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
        '''
        self.not_full.acquire()
        try:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() == self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() == self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                else:
                    endtime = _time() + timeout
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notifyAll()  # only change
        finally:
            self.not_full.release()

    def get(self, worker, block=True, timeout=None):
        self.not_empty.acquire()
        try:
            self._put_worker(worker)

            if not block:
                if not self._qsize():
                    raise Empty
                else:
                    return self._choose_worker(worker)
            elif timeout is None:
                while True:
                    while not self._qsize():
                        self.not_empty.wait()
                    try:
                        return self._choose_worker(worker)
                    except Empty:
                        self.not_empty.wait()

            elif timeout < 0:
                raise ValueError("'timeout' must be a positive number")
            else:
                endtime = _time() + timeout
                def wait(endtime):
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)

                while True:
                    while not self._qsize():
                        wait(endtime)

                    try:
                        return self._choose_worker(worker)
                    except Empty:
                        wait(endtime)
        finally:
            self._remove_worker(worker)
            self.not_empty.release()

    # Put a new worker in the worker queue
    def _put_worker(self, worker, heappush=heapq.heappush):
        heappush(self.worker, worker)

    # Remove a worker from the worker queue
    def _remove_worker(self, worker):
        self.worker.remove(worker)

    # Choose a matching worker with highest priority
    def _choose_worker(self, worker):
        worker_copy = self.worker[:]    # we need a copy so we can remove assigned worker
        for item in self.queue:
            for enqueued_worker in worker_copy:
                if item[1].type in enqueued_worker[1].capabilities:
                    if enqueued_worker == worker:
                        self.queue.remove(item)
                        self.not_full.notify()
                        return item
                    else:
                        worker_copy.remove(enqueued_worker)
                        # item will be taken by enqueued_worker (which has higher priority),
                        # so enqueued_worker is busy and can be removed
                        continue
        raise Empty

person Community    schedule 03.10.2010    source источник
comment
+1 интересный вопрос. У меня есть идея, но я хотел бы сначала увидеть другие ответы. Я пока дам вам небольшую подсказку: следите за случаем, когда есть свободная работа и два свободных воркера, но воркер с наивысшим приоритетом не может обработать задание в очереди. Будьте осторожны, чтобы не зайти в тупик. Точно так же есть случай, когда у вас есть две задачи и один рабочий, и рабочий не может обрабатывать задание с наивысшим приоритетом, снова следите за взаимоблокировкой. Вероятно, вам следует провести модульное тестирование этих случаев (и многих других тестов для других более распространенных сценариев — пустой очереди и т. д.).   -  person Mark Byers    schedule 03.10.2010
comment
Отлично подходит для начала работы с модульными тестами Python :)   -  person tauran    schedule 03.10.2010


Ответы (2)


Я думаю, вы описываете ситуацию, когда у вас есть две «очереди с приоритетом» — одна для заданий, а другая для рабочих. Наивный подход состоит в том, чтобы взять наиболее приоритетную работу и высокоприоритетного работника и попытаться соединить их. Но, конечно, это терпит неудачу, когда работник не может выполнить задание.

Чтобы исправить это, я бы предложил сначала взять задание с наивысшим приоритетом, а затем перебрать всех рабочих в порядке убывания приоритета, пока не найдете тот, который может обработать это задание. Если ни один из воркеров не может обработать задание, берется второе по приоритету задание и так далее. Таким образом, у вас есть вложенные циклы, что-то вроде этого:

def getNextWorkerAndJobPair():
    for job in sorted(jobs, key=priority, reverse=True):
        for worker in sorted(workers, key=priority, reverse=True):
             if worker.can_process(job):
                 return (worker, job)

Однако в приведенном выше примере данные сортируются без необходимости много раз. Чтобы избежать этого, было бы лучше хранить данные уже в отсортированном порядке. Что касается того, какие структуры данных использовать, я не совсем уверен, что лучше всего. В идеале вы хотели бы вставить и удалить O (log n) и иметь возможность перебирать коллекцию в отсортированном порядке за время O (n). Я думаю, что PriorityQueue соответствует первому из этих требований, но не второму. Я предполагаю, что отсортированный список из пакета blist будет работать, но я сам не пробовал и на веб-странице не указаны гарантии производительности, которые предлагает этот класс.

Способ, который я предложил сначала выполнять итерации по заданиям, а затем по рабочим процессам во внутреннем цикле, — не единственный подход, который вы можете использовать. Вы также можете изменить порядок циклов, чтобы сначала выбрать работника с наивысшим приоритетом, а затем попытаться найти для него работу. Или вы можете найти допустимую пару (работа, работник), которая имеет максимальное значение f (priority_job, priority_worker) для некоторой функции f (например, просто добавьте приоритеты).

person Mark Byers    schedule 03.10.2010
comment
Для моего редактирования я следовал вашему подходу. Сначала я попытаюсь заставить его работать, а потом подумаю о его настройке. Также я хочу повторно использовать как можно больше кода из PriorityQueue. - person tauran; 04.10.2010

Единственный ответ был полезным, но недостаточно подробным, поэтому я пока приму свой собственный ответ. Смотрите код в вопросе.

person Community    schedule 12.10.2010