视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001 知道1 知道21 知道41 知道61 知道81 知道101 知道121 知道141 知道161 知道181 知道201 知道221 知道241 知道261 知道281
问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501
Tornado的stackcontext
2020-11-09 13:25:06 责编:小采
文档

这两天在用 tornado 做一个 memcached 的 proxy,作为一个 Python 的高性能异步框架,tornado (实际是 epoll/kqueue… )的思想是——单线程+异步化,线程的运行时间不等待任何东西,这样就要求 memcached 的访问也必须异步化。如果线程在等待中消耗了,就无法达到高并发的目的,这个问题是无法通过简单地交给线程池或什么其他东西来达到的。

于是,这里就不能用常用的 python-memcache 来做了,实际上有几个基于 tornado 的 memcache 客户端,这个是维护得相对好的一个,也是一年前的了,而且,有两个问题:

  • 连接建立是同步的,不是异步的
  • 没有超时机制
  • 这样,在 server 或网络出现问题的时候,就可能遇到*烦,所以,我的目的就是绞尽脑汁加入超时机制,这个初步做出来了,等把 get 之外的方法也都异步化之后就反馈出来。这里主要依赖的机制就是 tornado 的 stack context——再次声明,我是这个方面的菜鸟,有什么不对的地方大家嘘之余给指出来呗。

    Stack context 的意图就是为执行程序保存一个上下文,在需要的时候,可以回到这个上下文执行,包括异常,都可以更好地、统一地处理。这个功能的代码不是很多,也比较清晰,但是文档……嗯,至少我是没看明白,结合 httpclient 的源码作为例子,加上看 stack_context 的代码,大概明白了是怎么用了。

    首先,在希望抓住问题的入口的地方要留住上下文:

     #......
     context = partial(self._cleanup, fail_callback = fail_callback)
     with stack_context.StackContext(context):
     getattr(c, cmd)(*args, **kwargs)

    这里,后面的执行内容,包括回调、触发事件,都可以通过抛出异常退到这里,而管理异常的就是 context,这里,用 functools.partial 包装了一下 _cleanup,_cleanup 的写法大致是这样的:

     @contextlib.contextmanager
     def _cleanup(self, fail_callback = None):
     try:
     yield
     except _Error as e:
     print "gotcha", e
     if fail_callback:
     fail_callback(e.args)

    这里,异常会被捕获,并调用用户指定的出错回调函数进行处理。后面的代码里,遇到故障,抛出异常就可以了,比如,可以用这个异常来返回超时:

     def _on_timeout(self, server):
     self._timeout = None
     server.mark_dead('Time out')
     raise _Error('memcache call timeout')

    这个异常是通过 io_loop 的 timeout 方法来触发的:

     self._timeout = self.io_loop.add_timeout(
     time.time() + self.request_timeout,
     stack_context.wrap(partial(self._on_timeout, server)))

    这样,就可以在异步程序里比较干净地处理掉超时问题了。

    这个代码对我这个水平的初学者还是比较晦涩的,大家可以参考下 HTTPClient 的源码,等我把这个 memcached client 的代码改完之后,也会放出来供参考指正的。

    —-

    update2: 放这里了?https://github.com/gnawux/tornado-memcache?, get 测试过,其他还没有,另外,我不是多个 server sharding 的应用场景,相关的还没测试。

    update :对于 timeout,设上了表忘了清除,如果是其他方式抛异常退出的话,也在抛异常的地方或者是最后处理异常的时候,把超时去掉

     if self._timeout is not None:
     self.io_loop.remove_timeout(self._timeout)
     self._timeout = None
    下载本文
    显示全文
    专题