скрученный: как элегантно общаться между кодом реактора и многопоточным кодом?

У меня есть клиент, подключенный к серверу с помощью Twisted. У клиента есть поток, который потенциально может делать что-то в фоновом режиме. Когда реактор останавливается, я должен:

1) check if the thread is doing things
2) stop it if it is

Какой элегантный способ сделать это? Лучшее, что я могу сделать, это что-то запутанное, например:

def cleanup(self):
    isWorkingDF = defer.Deferred()
    doneDF = defer.Deferred()

    def checkIsWorking():
        res = self.stuff.isWorking() #blocking call
        reactor.callFromThread(isWorkingDF.callback, res)

    def shutdownOrNot(isWorking):
        if isWorking:
            #shutdown necessary, shutdown is also a blocking call
            def shutdown():
                self.stuff.shutdown()
                reactor.callFromThread(doneDF, None)
            reactor.callInThread(shutdown)                
        else:
            doneDF.callback(None) #no shutdown needed

    isWorkingDF.addCallback(shutdownOrNot)

    reactor.callInThread(checkIsWorking)

    return doneDF

Сначала мы проверяем, работает ли он вообще. Результат этого обратного вызова попадает в rescallback, который либо выключается, либо нет, а затем запускает doneDF, который и ждет до закрытия.

Довольно запутался, а! Есть ли способ лучше?

Может быть, связанный с этим вопрос: есть ли более элегантный способ связать обратные вызовы друг с другом? Я мог видеть, что мне нужно сделать больше кода очистки после того, как это будет сделано, поэтому тогда мне придется сделать другой done deferred, и текущий doneDF запускает обратный вызов, который делает что-то, а затем вызывает этот done deferred..


person Claudiu    schedule 11.08.2010    source источник


Ответы (3)


Ах, настоящий ответ — использовать декоратор defer.inlineCallbacks. Приведенный выше код теперь становится:

@defer.inlineCallbacks
def procShutdownStuff(self):
    isWorking = yield deferToThread(self.stuff.isWorking)

    if isWorking:
        yield deferToThread(self.stuff.shutdown)

def cleanup(self):
    return self.procShutdownStuff()
person Claudiu    schedule 15.09.2010

Вы можете несколько упростить это, используя deferToThread вместо пар callInThread/callFromThread:

from twisted.internet.threads import deferToThread

def cleanup(self):
    isWorkingDF = deferToThread(self.stuff.isWorking)

    def shutdownOrNot(isWorking):
        if isWorking:
            #shutdown necessary, shutdown is also a blocking call
            return deferToThread(self.stuff.shutdown)

    isWorkingDF.addCallback(shutdownOrNot)

    return isWorkingDF

deferToThread — это, по сути, просто приятная оболочка вокруг той же логики потоковой передачи, которую вы дважды реализовали в своей версии функции.

person Jean-Paul Calderone    schedule 11.08.2010
comment
ах это может быть то, что я хочу, да. Я думаю, вы добавили сюда еще один элемент — если вы вернете deferred из обратного вызова, вызванного из другого deferred, что произойдет? я бы не подумал, что отложенный тоже будет ждать, но, может быть, я ошибаюсь? - person Claudiu; 12.08.2010
comment
ну, мне все еще нужно было больше отложенных запросов, но defertothread сделал код намного приятнее - person Claudiu; 12.08.2010
comment
Если вы вернете Deferred (x) из обратного вызова в другой Deferred (y), то y приостановит обработку своей цепочки обратных вызовов, пока x не получит результат. Затем y продолжит с того места, где остановился, используя результат x в качестве результата для своего следующего обратного вызова. - person Jean-Paul Calderone; 12.08.2010

Если программа завершается после выключения реактора, вы можете сделать поток потоком демона. Это автоматически завершится, когда все потоки, не являющиеся демонами, завершатся. Просто установите daemon = True в объекте потока перед вызовом start().

Если это нежизнеспособно, например. поток должен выполнить очистку ресурсов перед выходом, тогда вы можете общаться между реактором и потоком с помощью очереди. Поместите работу, которую нужно выполнить, в объект Queue, и пусть поток вытащит ее и сделает это. Имейте специальный токен "FINISH" (или просто None), чтобы указать, что поток должен быть завершен.

person Dave Kirby    schedule 11.08.2010
comment
мне нравится вторая идея, но мне все равно придется общаться с потоком, чтобы выяснить, включен ли он, а затем поставить что-то в очередь, если это так. это не решает основную проблему, связанную с блокировкой вызовов. я практически делаю это в любом случае, за исключением того, что вместо очереди используется одна переменная с защитой от блокировки. 1-я идея может действительно сработать, но я не уверен, что это именно та логика, которую я хочу - person Claudiu; 12.08.2010