同时在get或者post方法处理上应用@tornado.web.asynchronous和
@tornado.gen.engine装饰器,可以非常方便的和有callback参数的异步方法配合实现非阻塞请求处理。
这里不想说官方的那个例子,因为官方的例子给的本来就是用异步的http客户端来拉取数据然后自己转到回调函数执行,讨论http客户端的代码不在本文范围内。于是我用一个简单的函数来演示,且这样更能说明问题。
class RegisterHandler(basehandler.BaseHandler):
@tornado.web.asynchronous
@tornado.gen.engine
def get(self, *arg, **args):
response = yield tornado.gen.Task(self.method_while)
print "response",response
self.finish()
def method_while(self, *arg, **args):
callback(1)
这是我写的一个请求处理的Handler,get请求上加了这两个装饰器,第一个装饰器标明这个get函数是不会自动断掉输出流的,需要显式的调用finish方法。这个装饰器需要和@tornado.gen.engine一块用。
def engine(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
runner = None
def handle_exception(typ, value, tb):
if runner is not None:
return runner.handle_exception(typ, value, tb)
return False
with ExceptionStackContext(handle_exception) as deactivate:
gen = func(*args, **kwargs)
if isinstance(gen, types.GeneratorType):
runner = Runner(gen, deactivate)
runner.run()
return
assert gen is None, gen
deactivate()
return wrapper
(我把原来有的注释删掉了)
我们直接看with语句的内部就可以了:
gen = func(*args, **kwargs)
if isinstance(gen, types.GeneratorType):
runner = Runner(gen, deactivate)
gen就是我们的get方法,gen=fun(*args,**kwargs)相当于gen=get(*args,**kwargs),get方法执行结果返回给gen?no~no!get方法里有yield,意味着get(*args,**kwargs)返回的是一个生成器类型GeneratorType的数据,是不会马上执行的,需要调用他的next或者send方法让其执行到下一个yield方法处。关于生成器以及它的send和next的具体用法不在本文的讨论范围内,请查阅官方文档。
可以看到它new了一个Runner,gen传了进去,然后执行run方法,我们跟进Runner类的run方法的代码:
def run(self ):
if self.running or self.finished:#判断是否是在运行中或者已经结束,如果是立即返回
return
try:
self.running = True #让状态为运行中
while True:#进入循环
if self.exc_info is None:#上次循环没有异常
try:
if not self.yield_point.is_ready():#判断key是否可用
return
next = self.yield_point.get_result()#获取yield表达式的结果给next
except Exception:
self.exc_info = sys.exc_info()
try:
if self.exc_info is not None:
self.had_exception = True
exc_info = self.exc_info
self.exc_info = None
yielded = self.gen.throw(*exc_info)
else:
yielded = self.gen.send(next)#如果上次没异常,get函数继续执行,这里把Task对象返回给yielded,相当于要迭代使用这个Task(最开头标红的代码段),每次执行到yield都要把task重新赋给yielded
except StopIteration:#如果没有可以执行的了,返回
self.finished = True
if self.pending_callbacks and not self.had_exception:
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
self.deactivate_stack_context()
self.deactivate_stack_context = None
return
except Exception:
self.finished = True
raise
if isinstance(yielded, list):
yielded = Multi(yielded)
if isinstance(yielded, YieldPoint):#如果yielded为Task类型(Task继承于YieldPoint)
self.yield_point = yielded
try:
self.yield_point.start(self)#执行task的start方法
except Exception:
self.exc_info = sys.exc_info()
else:
self.exc_info = (BadYieldError("yielded unknown object %r" % yielded),)
finally:
self.running = False
具体步骤可以详见上述代码中的注释,这里我们就继续看task的start方法:
class Task(YieldPoint):
(省略其他代码)
def start(self, runner):
self.runner = runner
self.key = object()
runner.register_callback( self.key)
self.kwargs["callback"] = runner.result_callback(self.key)
self.func(*self.args, ** self.kwargs)
(省略其他代码)
这里的func方法就是最上面的method_while,传了一个callback参数进去:runner.result_callback( self.key),然后执行该方法。为了讨论这个callback参数的作用,我们假设这个method_while方法里,没有用到这个参数,即没有执行callback:
start方法在method_while执行完后就返回了,继续执行Runner的run方法里的那个循环的第二遍,它会直接在这一行
if not self.yield_point.is_ready():#判断key是否可用
return
直接返回,原因就是在results 里这个key对应的值为None。
这里涉及到了两个名词,results 和key,result是在哪里的呢?是Runner的一个字段,从下面的Runner的构造函数里可以看到results是个字典 :
class Runner(object):
def __init__(self, gen, deactivate_stack_context):
(省略其他代码)
self.pending_callbacks = set()
self.results = {}
(省略其他代码)
那么key是怎么设置到results这个字典里的呢?是在Task的start方法里,runner.register_callback( self.key),具体代码可以再看上面的Task类。这个register_callback是有点绕的,它将key设置到pending_callbacks里面,这个pending_callbacks是一个set类型,即不可重复的集合,有这个集合的作用是为了每次都要从这里面判断key是否存在,key存在说明有个Task要执行。在上面的run方法中的判断key是否可用的时候,is_ready每次都会先判断key是否在这个pending_callbacks,没有直接报异常,有的话才会去results去取结果。我们可以看到results一直是空啊,什么时候设置的值呢??就是刚刚我们在method_while里没有执行的callback方法,即runner.result_callback( self.key):
def result_callback(self , key):
def inner(*args, **kwargs):
if kwargs or len(args) > 1:
result = Arguments(args, kwargs)
elif args:
result = args[ 0]
else:
result = None
self.set_result(key, result)
return inner
可以看出来,他把callback实际调用时候的参数列表又写进了results里了,返回给yield表达式的值response。所以我们发现,一定要在自定义的方法中执行参数中的callback,把你想返回的数据写进callback的参数里,yield表达式的值就会是这个参数了,然后run方法继续执行到下一个yield直到get方法完成。
比如最上面那个例子,打印结果为:
response 1
最后来说这个异步方案有什么用呢?
一说到异步,新手肯定会以为是get/post方法阻塞了也没关系,一个get请求过来,get处理函数如果阻塞了,是必然导致整个服务器阻塞的!不要以为这个是相对web前端请求的并行异步解决方案,单线程的Tornado毕竟是单线程,方法阻塞,必然导致服务器阻塞,毋庸置疑的。
这个异步是相对与get请求来说的,get方法在没有加装饰器@tornado.web.asynchronous的情况下,get方法结束后,请求会自动断开,但是加上这个装饰器后,get请求可以直接return且保持连接,直到显式的调用finish方法才会关闭输出流。这样就萌生了异步(异步:把事情交给除了自己之外的人来做,自己不管)解决方案,get方法直接return,交给别的函数来做,做完了再回来。加上@tornado.gen.engine只是把上面这个思想实现了下,将代码浓缩到最小而已,这并不表示异步处理的函数或者这个get里有死循环的话也能同时让服务器处理别的请求,你查询数据库花费10000s,那么你的服务器就肯定要死10000s的,这种问题的解决方法有四个:crouchDB(有http接口,异步httpclient调用),优化当前数据库,异步httpclient调webservice,开线程。推荐第二个第三个,其次第四个,最后第一个(原因不解释,貌似crouchDB口碑不好,我也不爱用,mongo党飘过)。
就算是异步httpclient,就像下面这样
def handle_request(response):
if response.error:
print "Error:", response.error
else:
print response.body
http_client = AsyncHTTPClient()
http_client.fetch("http://www.google.com/", handle_request)
也只是fetch过后不管socket如何拉数据而已,回调也是要在本线程执行的!在别处如果有耗时操作或者死循环的,它拉过来的数据是永远进不到handle_request函数中的。