В момента работя с RabbitMQ в Python, използвайки клиента Pika, за да създам сървър, който обработва различни типове съобщения. Основната настройка, която имам, е една опашка, която получава всички входящи съобщения, процес на маршрутизиране, който ги насочва към правилните дестинации, и няколко процеса за обработка на заявки и приемане на входящи данни. Тази настройка работи добре, освен в един конкретен случай. Когато сървърът RabbitMQ работи, преди процесите на сървъра да бъдат стартирани и той получи съобщение, той правилно ги съхранява в опашката за входящи съобщения. Въпреки това, когато след това се опитам да стартирам тези процеси и да настроя потребител на тази непразна входяща опашка с функцията pika.basic_consume, програмата увисва. Така че в момента, ако искам да стартирам сървърните си процеси, трябва да изчистя всички съобщения от опашките, преди да работи правилно. Как да поправя това, за да работи с непразни опашки?
Ето извадка от един от процесите, всички те са настроени по същество еднакви като този.
class Router(Process):
def __init__(self,routing_table):
super(Router,self).__init__()
self.routing_table = routing_table
self.routeQueues = {
'r' : 'registration',
't' : 'util',
'p' : 'util',
's' : 'data'
}
# Create a connection to the RabbitMQ server.
self.rabbitConn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.rabbitConn.channel()
# Load all of the existing registered node queues
with open('registrations/nodes.txt','r') as nodes:
for line in nodes:
info = line.strip().split(":")
self.channel.queue_declare(info[1])
# Declare the default queues
queue_list = ["incoming","registration","util"]
for queueName in queue_list:
self.channel.queue_declare(queueName)
# Start consuming things from the incoming queue
self.channel.basic_consume(self.gotPacket,queue='incoming')
def gotPacket(self,ch,method,params,body):
# Does stuff. Not relevant here.
pass
def run(self):
self.channel.start_consuming()