使用生成器展平异步回调结构
1. 关于异步回调
异步机制,从某个角度,可以看成是——使用非阻塞的流程,以注册回调函数的形式进行业务处理。传统机制,是顺序执行代码,如果某个函数很长时间不返回,那么下面的代码就得不到执行,这就比较浪费时间。然后,对于这种问题的解决,一般有两种办法。一是以线程或进程的方式,跑多个实例,这样来把等待的消耗降低。二是就让函数尽快返回,需要等待的资源得到后再使用另外的函数去处理它。
比如下面的代码:
response = fetch('http://zouyesheng.com') s = 1 + 1 print response print s
去获取 zouyesheng.com
的内容可能需要花费几秒钟的时间,而这几秒钟,在“同步机制”下,你只能等待,连下面的 1+1 都做不了,这显然是一个悲剧。我们利用这等待的时间,除了做下面的 1+1 ,还可以做好多的事。异步结构就是这样的:
def show(response): print response async_fetch('http://zouyesheng.com', callback=show) s = 1+1 print s
async_fetch 只是预先注册一个回调函数,它不需要等待获取到实际的内容就马上返回了,然后代码继续执行去做 1+1 。而当 zouyesheng.com
的资源获取到的时间,之前注册的 show 函数就会被执行。这样,等待的时间就用来做其它事了,没有时间上的浪费。
这看起来不错。但是,这种异步结构,写代码的方式和思考问题的方式都和以前不太一样了。这本来问题不大,但是,如果是多重的异步回调,那代码组织起来就麻烦了。一层套一层,维护起来很麻烦。写过 twisted 和 node.js 的人应该知道其中痛苦。
一个好消息是, gevent 的出现解救了广大的 pythoner ,它让我们看到异步可以写得不异步。坏消息是我现在用的是 Tornado ,并且很喜欢用它。所以, Tornado 中如果多套几层回调函数一样的苦不堪言。于是,出现了 tornado.gen 这个东西,它使用 yield ,即生成器来把一个异步结构变得像传统的同步结构一样,引一个官方网站的例子就是,原来你是这样的:
class AsyncHandler(RequestHandler): @asynchronous def get(self): http_client = AsyncHTTPClient() http_client.fetch("http://example.com", callback=self.on_fetch) def on_fetch(self, response): do_something_with_response(response) self.render("template.html")
现在你可以写成这样:
class GenAsyncHandler(RequestHandler): @asynchronous @gen.engine def get(self): http_client = AsyncHTTPClient() response = yield gen.Task(http_client.fetch, "http://example.com") do_something_with_response(response) self.render("template.html")
好吧,其实早先我想把此文写成《tornado.gen详解》,但是后来发现, gen.py 的代码一时半会我好像比较不容易看懂,但是使用生成器作用于异步回调的原理我算是明白了,就把标题改成现在的了:)
另外, node.js 现在好像没有“生成器”这种机制,我只是觉得它有点太需要了。我对 node.js 并不熟悉。
2. 什么是生成器
在 Python 中,生成器,即 Generator ,是一种 type ,是一种对象。但是,从运行机制上来看,我把它理解成这样的一种机制——在函数体当中,你可以在指定的位置跳出此函数,然后在未来的某个时间,可以返回到函数中的这个位置继续执行——即是一种保留性的中断。比如下面的代码:
def fun(): print 1+1 yield None print 2+2
fun()这个函数执行之后,可以不执行完,在执行了 print 1+1 之后,我去做其它事,然后等一会再回来接着执行 print 2+2 。如下面的代码:。
gen = fun() gen.next() print 'haha' gen.next()
简单来说,生成器就是这样的。
在 Python 中,如果一个函数当中有 yield 这个关键词,那么当函数被调用时,将不会返回函数的执行结果,而是返回一个 Generaotr 的实例。这个实例有它自己的一些方法。接下来我们还需要了解生成器工作的一些细节。
# -*- coding: utf-8 -*- import pdb S = ['EMPTY'] def gen(): #[1] for i in ['a', 'b']: print i for i in range(10): S[0] = yield i #[2] [4] print str(S[0]) g = gen() #[a] pdb.set_trace() print g.next() #[b] pdb.set_trace() print g.send('ok') #[c] pdb.set_trace() print g.next() #[d] 1 2 3 4 a b c d
我们使用 pdb 加断点,来观察程序运行中一些变量的值变化,以了解生成器工作的具体方式。
- [a]执行时, g 得到了一个生成器对象。这时,函数的执行点位于 [1] 的位置。
- [b]执行时,跑函数,直到碰到一个 yield ,即执行点跑到了 [2]。注意,这时 S[0] 还没有被赋值。因为 S[0] 应该是 yield i 的返回值,而此时, yield 1 这个表达式只是被执行了,还没有返回。此时,因为 yield ,“弹出i”后(即 next() 或 send() 函数返回的是i),函数执行被中断。
- [c]执行时, send('ok') 即是传递 'ok' 给 [2] 处的 yield i ,这个 'ok' 就是 yield i 这个表达式的返回值( next() 实际上就是 send(None) )。此时 S[0] 被赋值 'ok' 。函数继续往下执行,直到碰到 yield 。此时执行点位于 [4]。
- [d]执行时,因为是 next() 调用,这时 yield i 返回值就是 None 了,于是 S[0] 被赋值为 None 。函数继续执行,直到碰到 yield ,或函数执行完都没有碰到 yield ,而引发一个 StopIteration 异常。
3. 把生成器应用于异步回调
搞清楚生成器的工作方式之后,考虑把它用于异步回调当中:
# -*- coding: utf-8 -*- import tornado.ioloop from tornado.httpclient import AsyncHTTPClient def callback(response): print response.body def fetch(): response = yield AsyncHTTPClient().fetch('http://zouyesheng.com', callback) print response gen = fetch() gen.next() try: gen.send('res') except StopIteration: pass tornado.ioloop.IOLoop.instance().start()
上面的代码很简单:
- 第一个 next() 被调用时, fetch() 函数就被调用了,但是 response 没有得到 yield 的返回。这时的 callback 不定时会被调用。
- 调用 send('res') 时, response 被赋值成
'res'
,函数向下执行完。因为没有 yield ,触发一个 StopIteration 异常。 其实看到这样,就很容易想到,要想异步变同步,那只需要在回调函数 callback 中去处理生成器的 send() 调用就可以了。 但是,这样会碰到另外一个问题。传递 callback 时,生成器都还不存在,那又如何能在 callback 中处理生成器呢。所以,我们需要改变一下我们的流程,一个快速实现如下:# -*- coding: utf-8 -*- import tornado.ioloop from tornado.httpclient import AsyncHTTPClient import functools def fetch(): response = yield functools.partial(AsyncHTTPClient().fetch, 'http://sohu.com') print response gen = fetch() f = gen.next() def callback(response): try: gen.send(response) except StopIteration: pass f(callback) print 'here' tornado.ioloop.IOLoop.instance().start()
# -*- coding: utf-8 -*- import tornado.ioloop from tornado.httpclient import AsyncHTTPClient import functools def task(func, url): return functools.partial(func, url) def callback(gen, response): try: gen.send(response) except StopIteration: pass def sync(func): def wrapper(): gen = func() f = gen.next() f(functools.partial(callback, gen)) return wrapper @sync def fetch(): response = yield task(AsyncHTTPClient().fetch, 'http://sohu.com') print '1' print response print '2' fetch() print '3' tornado.ioloop.IOLoop.instance().start()
4. 后话
这里只是简单介绍了把生成器用于异步回调结构,以改变代码组织形式的原理,吧。当然,实际使用中,要处理的问题复杂得多,比如在函数中有多个 yield 的情况,还在考虑运行时的上下文环境,等。所以 tornado.gen 我还不太看得明白。
不过,在这里也让我们看到了,生成器除了可以用于省内存的“惰性求值”外,还有很多可以发挥的地方。
这种生成器的方案,本质上是给异步执行的回调函数开辟了另外一个空间,然后在那个空间中再回到“当前空间”。在离开当前空间的时间段内,使用生成器的机制实际了“阻塞”函数流程的处理。
如果你需要在函数中并发执行一些异步制作,那这套机制就不适用了。但是总归在很多时候,可以写舒服点的代码了吧。