Усукан http сървър с асинхронен отговор, при който заявките трябва да изчакат данните да станат налични или изчакване

Опитвам се да напиша прост http сървър, който обработва асинхронни заявки, които търсят в структура от данни за отговор или изчакване:

  1. Заявката пристига
  2. Докато време ‹ изчакване проверка responseCollector за отговор (използвайки requestId като ключ)
  3. Ако отговорът, върнете го
  4. Ако изтече, връща съобщение за изчакване

Нов съм в twisted и се чудя кой е най-добрият начин да направя асинхронен отговор. Разгледах някои усукани отложени документи и callLater, но не ми беше ясно какво точно трябва да правя. В момента стартирам блокиращ метод с помощта на deferToThread и изчаквам да изтече време за изчакване. Получавам грешка, че низът не може да бъде извикан за моя отложен метод:

Unhandled error in Deferred:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 497, in __bootstrap
    self.__bootstrap_inner()
  File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 522, in __bootstrap_inner
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 477, in run
    self.__target(*self.__args, **self.__kwargs)
--- <exception caught here> ---
  File "/System/Library/Frameworks/Python.framework/Versions/2.6/Extras/lib/python/twisted/python/threadpool.py", line 210, in _worker
    result = context.call(ctx, function, *args, **kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.6/Extras/lib/python/twisted/python/context.py", line 59, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/System/Library/Frameworks/Python.framework/Versions/2.6/Extras/lib/python/twisted/python/context.py", line 37, in callWithContext
    return func(*args,**kw)
exceptions.TypeError: 'str' object is not callable

Ето моят код:

from twisted.web import server, resource
from twisted.internet import reactor, threads
import json
import time

connectedClients = {}
responseCollector = {}

# add fake data to the collector
class FakeData(resource.Resource):
    isLeaf = True

    def render_GET(self, request):
        request.setHeader("content-type", "application/json")
        if 'rid' in request.args and 'data' in request.args:
            rid = request.args['rid'][0]
            data = request.args['data'][0]
            responseCollector[str(rid)] = data
            return json.dumps(responseCollector)
        return "{}"

class RequestHandler(resource.Resource):
    isLeaf = True

    def render_GET(self, request):
        #request.setHeader("content-type", "application/json")

        if not ('rid' in request.args and and 'json' in request.args):
            return '{"success":"false","response":"invalid request"}'

        rid = request.args['rid'][0]
        json = request.args['id'][0]

        # TODO: Wait for data to show up in the responseCollector with same rid
        # as our request without blocking other requests OR timeout
        d = threads.deferToThread(self.blockingMethod(rid))
        d.addCallback(self.ret)
        d.addErrback(self.err)

    def blockingMethod(self,rid):
        timeout  = 5.0
        timeElapsed = 0.0
        while timeElapsed<timeout:
            if rid in responseCollector:
                return responseCollector[rid]
            else:
                timeElapsed+=0.01
                time.sleep(0.01)
        return "timeout"

    def ret(self, hdata):
        return hdata

    def err(self, failure):
        return failure

reactor.listenTCP(8080, server.Site(RequestHandler()))
reactor.listenTCP(9080, server.Site(FakeData()))
reactor.run()

Направете заявка (в момента не връща нищо полезно):

http://localhost:8080/?rid=1234&json={%22foo%22:%22bar%22}

Добавете някои фалшиви данни, които да използвате със заявка:

http://localhost:9080/?rid=1234&data=foo

Обновен с работеща версия

from twisted.web import server, resource
from twisted.internet import reactor, threads
import json
import time

connectedClients = {}
responseCollector = {}

# add fake data to the collector
class FakeData(resource.Resource):
    isLeaf = True

    def render_GET(self, request):
        request.setHeader("content-type", "application/json")
        if 'rid' in request.args and 'data' in request.args:
            rid = request.args['rid'][0]
            data = request.args['data'][0]
            responseCollector[str(rid)] = data
            return json.dumps(responseCollector)
        return "{}"

class RequestHandler(resource.Resource):
    isLeaf = True

    def render_GET(self, request):

        if not ('rid' in request.args and 'data' in request.args):
            return '{"success":"false","response":"invalid request"}'

        rid = request.args['rid'][0]
        json = request.args['data'][0]

        # TODO: Wait for data to show up in the responseCollector with same rid
        # as our request without blocking other requests OR timeout
        d = threads.deferToThread(self.blockingMethod,rid)
        d.addCallback(self.ret, request)
        d.addErrback(self.err)

        return server.NOT_DONE_YET

    def blockingMethod(self,rid):
        timeout  = 5.0
        timeElapsed = 0.0
        while timeElapsed<timeout:
            if rid in responseCollector:
                return responseCollector[rid]
            else:
                timeElapsed+=0.01
                time.sleep(0.01)
        return "timeout"

    def ret(self, result, request):
        request.write(result)
        request.finish()

    def err(self, failure):
        return failure

reactor.listenTCP(8080, server.Site(RequestHandler()))
reactor.listenTCP(9080, server.Site(FakeData()))
reactor.run()

person nflacco    schedule 13.05.2012    source източник


Отговори (2)


В render_GET() трябва да върнете twisted.web.server.NOT_DONE_YET. Трябва да предадете обекта на заявката към метода ret: d.addCallback(self.ret, request)

След това в ret(request) трябва да напишете асинхронните данни с помощта на request.write(hdata) и да затворите връзката с request.finish().

def ret(self, result, request):
    request.write(result)
    request.finish()

Малко информация от: http://twistedmatrix.com/documents/current/web/howto/using-twistedweb.html

Рендирането на ресурс се случва, когато Twisted Web локализира листов ресурсен обект, за да обработи уеб заявка. Методът за изобразяване на даден ресурс може да прави различни неща, за да произведе изход, който ще бъде изпратен обратно към браузъра:

  • Връщане на низ
  • Поискайте Deferred, върнете server.NOT_DONE_YET и извикайте request.write("stuff") и request.finish() по-късно, в обратно извикване на Deferred.
person Adi Roiban    schedule 14.05.2012
comment
Благодаря за съвета. Направих тези промени - връщане на NOT_DONE_YET след err.callback и промяна на функцията ret към това, което сте написали. Трябва да правя нещо нередно с отложената нишка, защото заявката към 8080 виси за неопределено време. - person nflacco; 14.05.2012
comment
Можете ли да актуализирате кода от първоначалния отговор. Предлагам да опростите кода само за да се съсредоточите върху асинхронните извиквания deferToThread и render_GET. - person Adi Roiban; 14.05.2012

Помислете за разликата между тези две версии на ред код от вашия пример:

d = threads.deferToThread(self.blockingMethod(rid))

vs

d = threads.deferToThread(self.blockingMethod, rid)

Прочетете документацията за API за deferToThread и може би прочетете някои Документация на Python за функционални обекти (документацията на python.org не покрива това много добре, но много документация на трети страни го прави).

person Jean-Paul Calderone    schedule 14.05.2012
comment
да, това е първият път, когато се забърквам с twisted и всяка функция, преминаваща в python - person nflacco; 15.05.2012