Я хотел бы расширить Queue.PriorityQueue, описанную здесь: http://docs.python.org/library/queue.html#Queue.PriorityQueue
Очередь будет содержать рабочие пакеты с приоритетом. Рабочие получат рабочие пакеты и обработают их. Хочу сделать следующие дополнения:
Рабочие тоже в приоритете. Когда несколько воркеров простаивают, тот, у кого наивысший приоритет, должен обрабатывать входящий рабочий пакет.
Не каждый рабочий процесс может обработать каждый рабочий пакет, поэтому необходим механизм, который проверяет, совпадают ли тип рабочего пакета и возможности рабочего процесса.
Я ищу подсказки, как это лучше всего реализовать (начиная с нуля, расширяя PrioriyQueue или Queue,...).
изменить
Вот моя первая (непроверенная) попытка. Основная идея заключается в том, что все ожидающие потоки будут уведомлены. Затем все они пытаются получить рабочий элемент через _choose_worker(self, worker)
. (Сделал это вики сообщества)
изменить
Теперь работает для некоторых простых тестов...
изменить Добавлен пользовательский BaseManager
и локальная копия рабочего списка в функцию _choose_worker
.
изменить исправление ошибки
import Queue
from Queue import Empty, Full
from time import time as _time
import heapq
class AdvancedQueue(Queue.PriorityQueue):
# Initialize the queue representation
def _init(self, _maxsize):
self.queue = []
self.worker = []
def put(self, item, block=True, timeout=None):
'''
Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
'''
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notifyAll() # only change
finally:
self.not_full.release()
def get(self, worker, block=True, timeout=None):
self.not_empty.acquire()
try:
self._put_worker(worker)
if not block:
if not self._qsize():
raise Empty
else:
return self._choose_worker(worker)
elif timeout is None:
while True:
while not self._qsize():
self.not_empty.wait()
try:
return self._choose_worker(worker)
except Empty:
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
def wait(endtime):
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
while True:
while not self._qsize():
wait(endtime)
try:
return self._choose_worker(worker)
except Empty:
wait(endtime)
finally:
self._remove_worker(worker)
self.not_empty.release()
# Put a new worker in the worker queue
def _put_worker(self, worker, heappush=heapq.heappush):
heappush(self.worker, worker)
# Remove a worker from the worker queue
def _remove_worker(self, worker):
self.worker.remove(worker)
# Choose a matching worker with highest priority
def _choose_worker(self, worker):
worker_copy = self.worker[:] # we need a copy so we can remove assigned worker
for item in self.queue:
for enqueued_worker in worker_copy:
if item[1].type in enqueued_worker[1].capabilities:
if enqueued_worker == worker:
self.queue.remove(item)
self.not_full.notify()
return item
else:
worker_copy.remove(enqueued_worker)
# item will be taken by enqueued_worker (which has higher priority),
# so enqueued_worker is busy and can be removed
continue
raise Empty