使用生成器展平异步回调结构

曹至梧 2016-01-10 16:55 更新
  1. 关于异步回调
  2. 什么是生成器
  3. 把生成器应用于异步回调
  4. 后话

1. 关于异步回调

异步机制,从某个角度,可以看成是——使用非阻塞的流程,以注册回调函数的形式进行业务处理。传统机制,是顺序执行代码,如果某个函数很长时间不返回,那么下面的代码就得不到执行,这就比较浪费时间。然后,对于这种问题的解决,一般有两种办法。一是以线程或进程的方式,跑多个实例,这样来把等待的消耗降低。二是就让函数尽快返回,需要等待的资源得到后再使用另外的函数去处理它。

比如下面的代码:

response = fetch('http://zouyesheng.com')
s = 1 + 1
print response
print s

去获取 zouyesheng.com 的内容可能需要花费几秒钟的时间,而这几秒钟,在“同步机制”下,你只能等待,连下面的 1+1 都做不了,这显然是一个悲剧。我们利用这等待的时间,除了做下面的 1+1 ,还可以做好多的事。异步结构就是这样的:

def show(response):
  print response

async_fetch('http://zouyesheng.com', callback=show)
s = 1+1
print s

async_fetch 只是预先注册一个回调函数,它不需要等待获取到实际的内容就马上返回了,然后代码继续执行去做 1+1 。而当 zouyesheng.com 的资源获取到的时间,之前注册的 show 函数就会被执行。这样,等待的时间就用来做其它事了,没有时间上的浪费。

这看起来不错。但是,这种异步结构,写代码的方式和思考问题的方式都和以前不太一样了。这本来问题不大,但是,如果是多重的异步回调,那代码组织起来就麻烦了。一层套一层,维护起来很麻烦。写过 twisted 和 node.js 的人应该知道其中痛苦。

一个好消息是, gevent 的出现解救了广大的 pythoner ,它让我们看到异步可以写得不异步。坏消息是我现在用的是 Tornado ,并且很喜欢用它。所以, Tornado 中如果多套几层回调函数一样的苦不堪言。于是,出现了 tornado.gen 这个东西,它使用 yield ,即生成器来把一个异步结构变得像传统的同步结构一样,引一个官方网站的例子就是,原来你是这样的:

class AsyncHandler(RequestHandler):
    @asynchronous
    def get(self):
        http_client = AsyncHTTPClient()
        http_client.fetch("http://example.com",
                          callback=self.on_fetch)

    def on_fetch(self, response):
        do_something_with_response(response)
        self.render("template.html")

现在你可以写成这样:

class GenAsyncHandler(RequestHandler):
    @asynchronous
    @gen.engine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield gen.Task(http_client.fetch, "http://example.com")
        do_something_with_response(response)
        self.render("template.html")

好吧,其实早先我想把此文写成《tornado.gen详解》,但是后来发现, gen.py 的代码一时半会我好像比较不容易看懂,但是使用生成器作用于异步回调的原理我算是明白了,就把标题改成现在的了:)

另外, node.js 现在好像没有“生成器”这种机制,我只是觉得它有点太需要了。我对 node.js 并不熟悉。

2. 什么是生成器

在 Python 中,生成器,即 Generator ,是一种 type ,是一种对象。但是,从运行机制上来看,我把它理解成这样的一种机制——在函数体当中,你可以在指定的位置跳出此函数,然后在未来的某个时间,可以返回到函数中的这个位置继续执行——即是一种保留性的中断。比如下面的代码:

def fun():
  print 1+1
  yield None
  print 2+2

fun()这个函数执行之后,可以不执行完,在执行了 print 1+1 之后,我去做其它事,然后等一会再回来接着执行 print 2+2 。如下面的代码:。

gen = fun()
gen.next()
print 'haha'
gen.next()

简单来说,生成器就是这样的。

在 Python 中,如果一个函数当中有 yield 这个关键词,那么当函数被调用时,将不会返回函数的执行结果,而是返回一个 Generaotr 的实例。这个实例有它自己的一些方法。接下来我们还需要了解生成器工作的一些细节。

# -*- coding: utf-8 -*-

import pdb
S = ['EMPTY']

def gen(): #[1]
    for i in ['a', 'b']:
        print i
    for i in range(10):
        S[0] = yield i #[2] [4]
        print str(S[0])

g = gen() #[a]
pdb.set_trace()
print g.next() #[b]
pdb.set_trace()
print g.send('ok') #[c]
pdb.set_trace()
print g.next() #[d]

1 2 3 4 a b c d

我们使用 pdb 加断点,来观察程序运行中一些变量的值变化,以了解生成器工作的具体方式。

  1. [a]执行时, g 得到了一个生成器对象。这时,函数的执行点位于 [1] 的位置。
  2. [b]执行时,跑函数,直到碰到一个 yield ,即执行点跑到了 [2]。注意,这时 S[0] 还没有被赋值。因为 S[0] 应该是 yield i 的返回值,而此时, yield 1 这个表达式只是被执行了,还没有返回。此时,因为 yield ,“弹出i”后(即 next()send() 函数返回的是i),函数执行被中断。
  3. [c]执行时, send('ok') 即是传递 'ok' 给 [2] 处的 yield i ,这个 'ok' 就是 yield i 这个表达式的返回值( next() 实际上就是 send(None) )。此时 S[0] 被赋值 'ok' 。函数继续往下执行,直到碰到 yield 。此时执行点位于 [4]
  4. [d]执行时,因为是 next() 调用,这时 yield i 返回值就是 None 了,于是 S[0] 被赋值为 None 。函数继续执行,直到碰到 yield ,或函数执行完都没有碰到 yield ,而引发一个 StopIteration 异常。

3. 把生成器应用于异步回调

搞清楚生成器的工作方式之后,考虑把它用于异步回调当中:

# -*- coding: utf-8 -*-

import tornado.ioloop
from tornado.httpclient import AsyncHTTPClient

def callback(response):
    print response.body

def fetch():
    response = yield AsyncHTTPClient().fetch('http://zouyesheng.com', callback)
    print response

gen = fetch()
gen.next()
try:
    gen.send('res')
except StopIteration:
    pass

tornado.ioloop.IOLoop.instance().start()

上面的代码很简单:

  1. 第一个 next() 被调用时, fetch() 函数就被调用了,但是 response 没有得到 yield 的返回。这时的 callback 不定时会被调用。
  2. 调用 send('res') 时, response 被赋值成 'res' ,函数向下执行完。因为没有 yield ,触发一个 StopIteration 异常。

    其实看到这样,就很容易想到,要想异步变同步,那只需要在回调函数 callback 中去处理生成器的 send() 调用就可以了。

    但是,这样会碰到另外一个问题。传递 callback 时,生成器都还不存在,那又如何能在 callback 中处理生成器呢。所以,我们需要改变一下我们的流程,一个快速实现如下:

    # -*- coding: utf-8 -*-
    
    import tornado.ioloop
    from tornado.httpclient import AsyncHTTPClient
    import functools
    
    def fetch():
        response = yield functools.partial(AsyncHTTPClient().fetch, 'http://sohu.com')
        print response
    
    gen = fetch()
    f = gen.next()
    
    def callback(response):
        try:
            gen.send(response)
        except StopIteration:
            pass
    
    f(callback)
    print 'here'
    
    tornado.ioloop.IOLoop.instance().start()
    

    在上面的代码中,我们 yield 出来的是一个函数,在生成器和 callback 都准备好了之后,我们再调用这个函数。这样,在 callback 中处理生成器,调用 send() 方法就没有问题了。因为 send() 方法的调用,原函数中的 response 获取到赋值,函数继续执行。这就是一切,在 fetch() 中,代码看起来就像是同步结构一样。

    在正式使用中,我们可以使用一些语法糖把代码装饰得更好看:

    # -*- coding: utf-8 -*-
    
    import tornado.ioloop
    from tornado.httpclient import AsyncHTTPClient
    import functools
    
    def task(func, url):
        return functools.partial(func, url)
    
    def callback(gen, response):
        try:
            gen.send(response)
        except StopIteration:
            pass
    
    def sync(func):
        def wrapper():
            gen = func()
            f = gen.next()
            f(functools.partial(callback, gen))
        return wrapper
    
    @sync
    def fetch():
        response = yield task(AsyncHTTPClient().fetch, 'http://sohu.com')
        print '1'
        print response
        print '2'
    
    fetch()
    print '3'
    
    tornado.ioloop.IOLoop.instance().start()
    

    这里的 fetch() 看起来就像是那么一回事了吧 :)

4. 后话

这里只是简单介绍了把生成器用于异步回调结构,以改变代码组织形式的原理,吧。当然,实际使用中,要处理的问题复杂得多,比如在函数中有多个 yield 的情况,还在考虑运行时的上下文环境,等。所以 tornado.gen 我还不太看得明白。

不过,在这里也让我们看到了,生成器除了可以用于省内存的“惰性求值”外,还有很多可以发挥的地方。

这种生成器的方案,本质上是给异步执行的回调函数开辟了另外一个空间,然后在那个空间中再回到“当前空间”。在离开当前空间的时间段内,使用生成器的机制实际了“阻塞”函数流程的处理。

如果你需要在函数中并发执行一些异步制作,那这套机制就不适用了。但是总归在很多时候,可以写舒服点的代码了吧。

评论
©2010-2016 zouyesheng.com All rights reserved. Powered by GitHub , txt2tags , MathJax