Pyzmq - отправлять несколько сообщений без блокировки

Я хочу отправить несколько сообщений из одного сокета в другой, и я столкнулся с этой проблемой.

client.py

context = Context()

for i in range(10):
    print(i)
    out_socket = context.socket(REQ)
    out_socket.connect("tcp://localhost:%s" % "5000")
    message_content = ("hello", 1)
    pickled_message = dumps(message_content)
    out_socket.send(pickled_message, flags=NOBLOCK)

Server.py

context = Context()
in_socket = context.socket(REP)
in_socket.bind("tcp://*:%s" % "5000")

poller = Poller()
poller.register(in_socket, POLLIN)

while True:
    socks = dict(poller.poll())
    print(socks)

    if socks.get(in_socket) == POLLIN:
        pickled_received_message = in_socket.recv(flags=NOBLOCK)
        received_message, sender_entity_id_e = loads(pickled_received_message)
        print(received_message, sender_entity_id_e)

Вывод client.py

client.py выводит числа от 0 до 9. Это навело меня на мысль, что сообщения были отправлены, и это сработало. Затем я присмотрелся к server.py и понял, что хотя цикл выполнялся 10 раз, было отправлено только одно сообщение.

Я хочу реализовать отправку нескольких сообщений с клиента на сервер без блокировки. Есть идеи, как этого добиться? Я уже пробовал несколько решений без успеха. Я думаю, что это должно работать, но я не уверен, почему это не работает.


person M.Puk    schedule 27.03.2018    source источник


Ответы (1)


Инфраструктура ZeroMQ работает документально

Если в нем говорится, что асинхронная служба не доставляет сообщение в тот момент, когда возвращается неблокирующий вызов метода .send( something, flags = zmq.DONTWAIT ), это действительно означает, что инфраструктура службы ZeroMQ стала владельцем сообщения. полезная нагрузка (в этом примере something), но не меньше и не больше.

Далее тот же принцип работает с методами .bind()/.connect(), которые начинают строить следующую точку доступа инфраструктуры ZeroMQ, будь то локально (для .bind()-случая) или удаленно (в случае, если .connect()-метод был запрошен для начать выполнять некоторые низкоуровневые, специфичные для транспортного класса последовательности шагов и средств защиты), некоторые из которых могут по прошествии значительного времени (просто сравните, сколько времени требуется для достижения RTO-состояния, в [us], так как было предложено начать настройку инфраструктуры, чтобы получить приблизительную оценку, сколько времени может пройти, прежде чем она перейдет в состояние готовности к работе... )

context = Context()
"""        
for        i in range( 10 ):
    print( i ) # vvvvvvvvvvvvvvvvvvvv------------------- NEVER THIS WAY [us].....
    out_socket = context.socket( REQ )
    out_socket.connect( "tcp://localhost:%s" % "5000" )
    message_content = ( "hello", 1 )
    pass;            pickled_message = dumps( message_content )
    out_socket.send( pickled_message, flags = NOBLOCK )
"""
#-------------------------------------------------ZMQ-# <INSTANTIATION>
out_socket = context.socket( zmq.REQ )                # .SET AN INSTANCE
out_socket.setsockopt(       zmq.LINGER, 0 )          # .SET ALWAYS
out_socket.setsockopt(       zmq.SNDHWM, ... )        # .SET AS NEEDED
out_socket.setsockopt(       zmq.AFFINITY, ... )      # .SET AS NEEDED
out_socket.setsockopt(       zmq.<_*-attributes_>,...)# .SET AS NEEDED
#-------------------------------------------------ZMQ-# <INFRASTRUCTURE>
out_socket.connect( "tcp://localhost:%s" % "5000" )   # .CONNECT()-async call

for        i in range( 100 ):                         # py2.7: xrange( 100 ):
    print( i )
    # -------------------------------------------------------
    #                  pass;                   message_content = ( "hello", 1 )
    #                  pickled_message = dumps(message_content)
    # out_socket.send( pickled_message, flags = NOBLOCK )
    #-------------------------------------------------------- PYTHON-SIDE COSTS
    out_socket.send(                     dumps( ( "hello", i ) ), zmq.DONTWAIT )
print( "finished the for()-loops" )
#-------------------------------------------------ZMQ-# <GRACEFUL TERMINATION>
out_socket.close()
context.term()
person user3666197    schedule 10.04.2018