Бих искал да разширя Queue.PriorityQueue, описан тук: http://docs.python.org/library/queue.html#Queue.PriorityQueue
Опашката ще съдържа работни пакети с приоритет. Работниците ще получат работни пакети и ще ги обработват. Искам да направя следните допълнения:
Работниците също имат приоритет. Когато няколко работници са неактивни, този с най-висок приоритет трябва да обработи входящ работен пакет.
Не всеки работник може да обработва всеки работен пакет, така че е необходим механизъм, който проверява дали типът на работния пакет и възможностите на работника съвпадат.
Търся съвети как това е най-добре приложено (започване от нулата, разширяване на PrioriyQueue или Queue, ...).
редактиране
Ето го първият ми (непроверен) опит. Основната идея е, че всички чакащи нишки ще бъдат уведомени. След това всички се опитват да получат работен елемент чрез _choose_worker(self, worker)
. (Направи го уики на общността)
редактиране
Сега работи за някои прости тестове...
редактиране Добавено е персонализирано BaseManager
и локално копие на работния списък във функцията _choose_worker
.
редактиране корекция на грешка
import Queue
from Queue import Empty, Full
from time import time as _time
import heapq
class AdvancedQueue(Queue.PriorityQueue):
# Initialize the queue representation
def _init(self, _maxsize):
self.queue = []
self.worker = []
def put(self, item, block=True, timeout=None):
'''
Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
'''
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notifyAll() # only change
finally:
self.not_full.release()
def get(self, worker, block=True, timeout=None):
self.not_empty.acquire()
try:
self._put_worker(worker)
if not block:
if not self._qsize():
raise Empty
else:
return self._choose_worker(worker)
elif timeout is None:
while True:
while not self._qsize():
self.not_empty.wait()
try:
return self._choose_worker(worker)
except Empty:
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
def wait(endtime):
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
while True:
while not self._qsize():
wait(endtime)
try:
return self._choose_worker(worker)
except Empty:
wait(endtime)
finally:
self._remove_worker(worker)
self.not_empty.release()
# Put a new worker in the worker queue
def _put_worker(self, worker, heappush=heapq.heappush):
heappush(self.worker, worker)
# Remove a worker from the worker queue
def _remove_worker(self, worker):
self.worker.remove(worker)
# Choose a matching worker with highest priority
def _choose_worker(self, worker):
worker_copy = self.worker[:] # we need a copy so we can remove assigned worker
for item in self.queue:
for enqueued_worker in worker_copy:
if item[1].type in enqueued_worker[1].capabilities:
if enqueued_worker == worker:
self.queue.remove(item)
self.not_full.notify()
return item
else:
worker_copy.remove(enqueued_worker)
# item will be taken by enqueued_worker (which has higher priority),
# so enqueued_worker is busy and can be removed
continue
raise Empty