Как определить multiprocessing.Pipe заполнен?

Описание проблемы: я выполняю многопроцессорную обработку в Python и использую multiprocessing.Pipe() для связи между процессами. Я много искал, но все еще не мог найти способ определить, заполнена ли труба или нет. Например, ниже процесс writePipe продолжает помещать число в 2 разных канала (нечетный и четный), а процесс readPipe постоянно считывает данные из этих 2 каналов. Однако скорость чтения из нечетной трубы намного выше, поэтому четная труба будет заполнена. В этот момент процесс writePipe будет заблокирован, в то время как процесс readPipe все еще ожидает чтения из Odd Pipe, что вызывает взаимоблокировку.

Мой вопрос: есть ли способ определить, что канал заполнен, чтобы мы могли прекратить вводить число в заполненный канал, пока он еще работает, и поместить число в канал, в котором все еще есть пробелы?

from multiprocessing import Process, Pipe


def writePipe(sendNumberOdd, sendNumberEven):
    i = 0
    while True:
        if i % 2 == 0:
            sendNumberEven.send(i)
        else:
            sendNumberOdd.send(i)
        i += 1

def readPipe(recvNumberOdd, recvNumberEven):
    countEven = 0
    while True:
        countEven += 1
        print(countEven, recvNumberEven.recv())

        countOdd = 0
        while countOdd < 50:
            countOdd += 1
            print (countOdd, recvNumberOdd.recv())



if __name__ == '__main__':
    recvNumberOdd, sendNumberOdd = Pipe(duplex=False)
    recvNumberEven, sendNumberEven = Pipe(duplex=False)

    write = Process(target=writePipe, args=(sendNumberOdd, sendNumberEven))
    read = Process(target=readPipe, args=(recvNumberOdd, recvNumberEven))
    write.start()
    read.start()

    sendNumberOdd.close()
    sendNumberEven.close()

person Le Quoc Khanh    schedule 26.07.2017    source источник


Ответы (2)


Предложение НЕПРОВЕРЕНО

class Connection(multiprocessing.Connection):
    def __init__(self, maxsize=0):
        self.__maxsize = maxsize
        self.size = 0
        self.__lock = multiprocessing.Lock

    def send(self, obj):
        with self.__lock:
            self.size += sizeof(obj)
        super().send(obj)

    def recv(self):
        _recv = super().recv()
        with self.__lock:
            self.size -= sizeof(_recv)
        return _recv

    def full(self):
        if self.__maxsize > 0:
            return self.size >= self.__maxsize
        return None

def Pipe(maxsize=0, duplex=True):
    return Connection(maxsize), Connection(maxsize)

Реализуйте poll(), чтобы проверить, готовы ли какие-либо данные.

Python » Документация: poll([timeout])

Return whether there is any data available to be read.  

Например:

if recvNumberEven.poll():
    countEven += 1
    print(countEven, recvNumberEven.recv())

Альтернативное использование wait(...) для обоих

multiprocessing.connection.wait(object_list, timeout=None)

Wait till an object in object_list is ready.  
Returns the list of those objects in object_list which are ready.
person stovfl    schedule 26.07.2017
comment
Спасибо за ваш ответ. Однако я думаю, что вы неправильно понимаете мою проблему. Что мне нужно, так это способ определить, заполнен ли канал, чтобы остановить отправку дополнительных данных в канал из процесса writePipe, чтобы предотвратить взаимоблокировку. Между тем, poll() можно использовать только для того, чтобы узнать, есть ли какие-либо данные в Pipe для чтения, поэтому в этой ситуации он не может помочь. Мне нужно что-то похожее на Queue.put(block=False) или Queue.put_nowait, которое вызывает исключение queue.Full, чтобы я мог обрабатывать, когда канал заполнен. - person Le Quoc Khanh; 27.07.2017
comment
@Le Quoc Khanh: Нет условия Труба заполнена, ваше единственное решение — предотвратить процесс Consumer waif навсегда< /я> . - person stovfl; 27.07.2017
comment
Еще раз спасибо, но, насколько я понимаю, multiprocessing.Queue построена поверх Pipe и имеет некоторый механизм для обнаружения и создания исключения queue.Full, когда очередь заполнена. Я попытался прочитать исходный код queues.py в многопроцессорной библиотеке и заметил, что он использовал какой-то Boundedsemaphore для обнаружения, но я до сих пор не понимаю логики. У вас есть какие-либо сведения об этом? - person Le Quoc Khanh; 27.07.2017
comment
Queueобработайте это с помощью счетчика размера, условие FULL равно True, если счетчик размера равен данным в буфере. Но зачем изобретать велосипед, почему Queue не подходит под ваши нужды? - person stovfl; 27.07.2017
comment
Потому что производительность Queue очень низкая (~ в 3 раза медленнее, чем у Pipe) из-за того, что Queue реализована с некоторым механизмом для защиты от нескольких производителей и нескольких потребителей, который мне не нужен (я использую только одного производителя, IPC с одним потребителем в полудуплексном режиме). Поскольку мне нужно обрабатывать много данных в режиме реального времени, я должен выбрать использование Pipe, а не Queue. Не могли бы вы подробнее объяснить, как работает Queue, чтобы определить, заполнена ли она, и как я могу применить аналогичный метод для Pipe. Я правда ценю это. Большое спасибо. - person Le Quoc Khanh; 27.07.2017
comment
@LeQuocKhanh: обновил мой ответ - person stovfl; 27.07.2017

Вы можете использовать функцию select из модуля select для реализации проверка того, заполнена ли выходная труба.

import select
import multiprocessing.connection as mpc


def pipe_full(conn):
    r, w, x = select.select([], [conn], [], 0.0)
    return 0 == len(w)


i, o = mpc.Pipe(duplex=False)

n = 0
while not pipe_full(o):
    o.send(n)
    n += 1

print('{} items fit.'.format(n))
person xxa    schedule 15.06.2019