Как мога да отменя висяща асинхронна задача в торнадо с изчакване?

Моята настройка е python tornado сървър, който асинхронно обработва задачи с ThreadPoolExecutor. При някои условия задачата може да се превърне в безкраен цикъл. С декоратора with_timeout успях да хвана изключението за изчакване и да върна резултат за грешка на клиента. Проблемът е, че задачата все още работи във фонов режим. Как е възможно да спрете изпълнението на задачата в ThreadPoolExecutor? Или е възможно да отмените Future? Ето кода, който възпроизвежда проблема. Стартирайте кода с библиотеките tornado 4 и concurrent.futures и отидете на http://localhost:8888/test

from tornado.concurrent import run_on_executor
from tornado.gen import with_timeout
from tornado.ioloop import IOLoop
import tornado.web
from tornado import gen
from concurrent.futures import ThreadPoolExecutor
import datetime
MAX_WAIT_SECONDS = 10

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self):
        ...
        #infinite loop might be here
        ...

    @tornado.gen.coroutine
    def get(self):
        future = self.test_func()
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            #how to cancel the task here if it was timeout
            future.cancel() # <-- Does not work
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])
application.listen(8888)
IOLoop.instance().start()

person Egor Lakomkin    schedule 10.03.2015    source източник


Отговори (1)


Самите екземпляри Future не могат да бъдат отменени, след като действително се изпълняват, те могат да бъдат отменени само ако са в чакащо състояние. Това е отбелязано в документите:

cancel()

Опит за отмяна на повикването. Ако повикването се изпълнява в момента и не може да бъде отменено, тогава методът ще върне False, в противен случай повикването ще бъде отменено и методът ще върне True.

И така, единственият начин да прекъснете метода, който изпълнявате във фонов режим, е действително да вмъкнете логика във вашия потенциално безкраен цикъл, така че да може да бъде прекъснат, когато му кажете. С вашия пример можете да използвате threading.Event:

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self, event):
        i = 0
        while not event.is_set():
            print i
            i = i + 1

    @tornado.gen.coroutine
    def get(self):
        event = threading.Event()
        future = self.test_func(event)
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            future.cancel() # Might not work, depending on how busy the Executor is
            event.set()
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])
person dano    schedule 10.03.2015
comment
dano, благодаря за отговора, проблемът е, че няма истински цикъл. Има фин бъг, който все още се разследва, което причинява безкраен цикъл в задачата. Като временно решение мислех да отменя окачената нишка atm. Не съм сигурен как да го направя в текущата структура на кода. - person Egor Lakomkin; 10.03.2015
comment
Промених кода си, така че да не обърка, че има истински цикъл - person Egor Lakomkin; 10.03.2015
comment
@EgorLakomkin Вижте този въпрос относно убиването на нишка в Python. Има някои неща, които можете да направите, които ще работят при определени обстоятелства, но всъщност няма единен подход, който да работи за всички случаи на употреба. Повечето от подходите там са особено проблематични, защото използвате директно ThreadPoolExecutor, а не Thread обект. - person dano; 10.03.2015