Как отменить зависшую асинхронную задачу в торнадо с тайм-аутом?

Моя установка — это сервер python tornado, который асинхронно обрабатывает задачи с файлом ThreadPoolExecutor. В некоторых условиях задача может превратиться в бесконечный цикл. С помощью декоратора with_timeout мне удалось поймать исключение тайм-аута и вернуть клиенту результат ошибки. Проблема в том, что задача все еще выполняется в фоновом режиме. Как можно остановить выполнение задачи в ThreadPoolExecutor? Или можно отменить Future? Вот код, который воспроизводит проблему. Запустите код с библиотеками tornado 4 и concurrent.futures и перейдите по адресу http://localhost:8888/test.

from tornado.concurrent import run_on_executor
from tornado.gen import with_timeout
from tornado.ioloop import IOLoop
import tornado.web
from tornado import gen
from concurrent.futures import ThreadPoolExecutor
import datetime
MAX_WAIT_SECONDS = 10

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self):
        ...
        #infinite loop might be here
        ...

    @tornado.gen.coroutine
    def get(self):
        future = self.test_func()
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            #how to cancel the task here if it was timeout
            future.cancel() # <-- Does not work
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])
application.listen(8888)
IOLoop.instance().start()

person Egor Lakomkin    schedule 10.03.2015    source источник


Ответы (1)


Сами экземпляры Future нельзя отменить после того, как они действительно выполняются, их можно отменить только в том случае, если они находятся в состоянии ожидания. Это отмечено в документах:

отмена()

Попытка отменить вызов. Если вызов в настоящее время выполняется и не может быть отменен, метод вернет False, в противном случае вызов будет отменен и метод вернет True.

Таким образом, единственный способ прервать метод, который вы выполняете в фоновом режиме, - это фактически вставить логику в ваш потенциально бесконечный цикл, чтобы его можно было прервать, когда вы скажете ему об этом. В вашем примере вы можете использовать threading.Event:

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self, event):
        i = 0
        while not event.is_set():
            print i
            i = i + 1

    @tornado.gen.coroutine
    def get(self):
        event = threading.Event()
        future = self.test_func(event)
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            future.cancel() # Might not work, depending on how busy the Executor is
            event.set()
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])
person dano    schedule 10.03.2015
comment
dano, спасибо за ответ, проблема в том, что нет настоящей петли. Существует незаметная ошибка, которая все еще находится в стадии расследования, которая вызывает бесконечный цикл внутри задачи. В качестве временного решения я подумал об отмене зависшего потока на банкомате. Я не уверен, как это сделать в текущей структуре кода. - person Egor Lakomkin; 10.03.2015
comment
Я изменил свой код, чтобы его не смутило наличие реального цикла. - person Egor Lakomkin; 10.03.2015
comment
@EgorLakomkin См. этот вопрос об убийстве поток в Python. Есть некоторые вещи, которые вы можете сделать, которые будут работать в определенных обстоятельствах, но на самом деле не существует единого подхода, который будет работать для всех случаев использования. Большинство подходов особенно проблематичны, потому что вы используете объект ThreadPoolExecutor, а не объект Thread напрямую. - person dano; 10.03.2015