内容简介:tornado源码解析
目录
- 本文在CentOS Linux release 7.3.1611 (Core)、 Python 2.7.5、tornado 4.5.3下测试通过
- 建议有一定的tornado使用基础,再阅读本文;否则,可以先看。
- 关于IO多路复用机制,可以参考:这篇文档
- 关于阻塞socket 和 非阻塞socket,可以参考:这篇文档
-
阻塞调用 和 非阻塞调用
- 阻塞调用是指 函数在获取到返回结果之前,会将当前线程挂起
- 非阻塞调用是指 无论函数是否能立即获取到返回结果,它都会立即返回
-
同步 和 异步
- 同步是指 在有事件发生时,由程序自己负责读写
- 异步是指 当发生关注的事件时,由异步IO实现负责将数据从内核空间读到用户空间
tornado.util.Configurable类[]
源代码在: http://www.tornadoweb.org/en/stable/_modules/tornado/util.html#Configurable 。
在阅读Configurable的源码前,可以先看下这篇文档: Python中的类、构造方法、元类 。
Configurable的核心是其构造方法: __new__
。 它是 其实现子类的 工厂方法
。
在新式类中,如果子类没有定义 __new__
方法,那么会使用父类的该方法,来创建对象。因此,Configurable的 未定义 __new__
方法的子类(比如IOLoop等) 都会使用Configurable的构造方法。
下面看一下Configurable的主要代码:
class Configurable(object): __impl_class = None # type: type __impl_kwargs = None # type: Dict[str, Any] def __new__(cls, *args, **kwargs): base = cls.configurable_base() init_kwargs = {} # 如果子类的配置基类就是子类本身, # + 那么: # + + 如果配置基类通过__impl_class属性指定了实现类,则使用它; # + + 否则,使用子类的configurable_default()方法返回的实现类。 if cls is base: impl = cls.configured_class() if base.__impl_kwargs: init_kwargs.update(base.__impl_kwargs) # 如果子类的配置基类不是自身,直接使用子类作为实现类。 else: impl = cls init_kwargs.update(kwargs) # 创建实现类的实例 instance = super(Configurable, cls).__new__(impl) # initialize vs __init__ chosen for compatibility with AsyncHTTPClient # singleton magic. If we get rid of that we can switch to __init__ # here too. # 使用传递给构造方法的参数,调用实例的initialize方法,进行初始化。 # 实例的__init__方法也会被自动调用,但是一般不应该使用__init__方法进行初始化,而是建议使用initialize方法 instance.initialize(*args, **init_kwargs) return instance # configurable_base()方法用于返回子类的配置基类。通常,子类的配置基类就是其自身(但是不是必须)。 # issubclass(子类, 配置基类) == True。 # 该方法需要在子类中实现。 @classmethod def configurable_base(cls): # type: () -> Any # TODO: This class needs https://github.com/python/typing/issues/107 # to be fully typeable. """Returns the base class of a configurable hierarchy. This will normally return the class in which it is defined. (which is *not* necessarily the same as the cls classmethod parameter). """ raise NotImplementedError() # configured_class()方法的作用是: # + 如果子类的__impl_class属性是None(也就是,其配置基类没有通过__impl_class属性指定实现类), # + 那么,则使用它的configurable_default()方法返回的类作为实现类,并将其保存到配置基类的__impl_class属性中; # + 否则,直接使用配置基类的__impl_class属性指定的实现类。 # 实现类是子类自身或其子类。 @classmethod def configured_class(cls): # type: () -> type """Returns the currently configured class.""" base = cls.configurable_base() if cls.__impl_class is None: base.__impl_class = cls.configurable_default() return base.__impl_class # configure()方法用于在运行时,为Configurable的子类指定实现类,以及初始化时使用的参数。 # + 它们会被保存到 子类的配置基类的 __impl_class 和 __impl_kwargs属性中 @classmethod def configure(cls, impl, **kwargs): # type: (Any, **Any) -> None """Sets the class to use when the base class is instantiated. Keyword arguments will be saved and added to the arguments passed to the constructor. This can be used to set global defaults for some parameters. """ base = cls.configurable_base() if isinstance(impl, (str, unicode_type)): impl = import_object(impl) if impl is not None and not issubclass(impl, cls): raise ValueError("Invalid subclass of %s" % cls) base.__impl_class = impl base.__impl_kwargs = kwargs
总结:
继承Configurable的子类,需要实现下列的方法:
- configurable_base(cls):通常返回子类自身
- configurable_default(cls):返回默认的实现类,比如根据不同的操作系统平台,返回不同的实现类
子类的实现类,需要实现:
- initialize(self, *a, **kw):用于实例的初始化
tornado.ioloop.IOLoop
就是Configurable子类。下面看看其中关于对象创建的关键代码:
class IOLoop(Configurable): # Global lock for creating global IOLoop instance _instance_lock = threading.Lock() _current = threading.local() # 返回一个全局的IOLoop实例, 大多数应用程序是在主线程上运行一个单例的、全局的IOLoop实例。 大多数情形下,使用该方法获取IOLoop实例 比 使用current()方法获取当前线程的IOLoop实例要好。 @staticmethod def instance(): """Returns a global `IOLoop` instance. Most applications have a single, global `IOLoop` running on the main thread. Use this method to get this instance from another thread. In most other cases, it is better to use `current()` to get the current thread's `IOLoop`. """ if not hasattr(IOLoop, "_instance"): with IOLoop._instance_lock: if not hasattr(IOLoop, "_instance"): # New instance after double check IOLoop._instance = IOLoop() # 因为IOLoop继承了Configurable,并且IOLoop自己没有定义__new__方法,所以会调用Configurable的__new__方法,创建实例。 # IOLoop的configurable_base()方法,返回的是自身,并且没有通过__impl_class指定实现类。 # 所以会使用IOLoop.configurable_default()方法返回的类作为实现类。 # 首先应该知道的是:IOLoop本质上是封装了操作系统的IO多路复用机制,不同的操作系统提供了不同的IO多路复用机制, # 比如 linux 提供了epoll机制,bsd和mac提供了kqueue机制,windows提供了select机制。 # 在tornado中,每种机制都对应一个具体的IOLoop子类,比如linux的epoll对应tornado.platform.epoll.EPollIOLoop类。 # IOLoop.configurable_default()方法完成的任务就是根据操作系统平台选择其对应的实现类。 # 所以,开发人员不必自己根据操作系统选择实现类,这个过程是自动的。 # 在创建了实现类的对象之后,就会调用它的intialize()方法进行初始化。以EPollIOLoop为例进行说明: # EPollIOLoop的initialize方法会调用其父类PollIOLoop的initialize()方法, # 并且将impl参数指定为select.epoll()(也就是使用epoll机制) # PollIOLoop的initalize()方法,又会调用IOLoop的initialize()方法。 # IOLoop的initialize()方法,会先调用IOLoop.current(instance=False)方法, # 如果当前线程上,已经有一个IOLoop实例,那么IOLoop.current(instance=False)方法会返回该实例,否则返回None。 # 当当前线程已经有一个IOLoop实例的时候,IOLoop.initialize()方法,会根据make_current参数,来决定:不做任何处理或抛出异常。 # 如果当前线程没有IOLoop实例,那么则将新建的这个IOLoop实例,保存到IOLoop._current这个线程本地变量中。 # 其实,IOLoop.current()方法也是从IOLoop._current这个线程本地变量,获取当前线程的IOLoop实例的。 # 这里用到了ThreadLocal,关于ThreadLocal可以看这篇文档。 return IOLoop._instance # 如果当前线程有自己的IOLoop实例,则返回它;否则, # 如果instance参数是True,返回全局的IOLoop实例; # 如果instance参数是False,返回None。 @staticmethod def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self @classmethod def configurable_base(cls): return IOLoop @classmethod def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is not None: raise RuntimeError("current IOLoop already exists") self.make_current()
使用IOLoop有两种方式:
- 1,通过IOLoop.instance()获取一个 全局的、单例的 IOLoop实例
- 2,在线程中创建IOLoop实例,然后通过IOLoop.current()获取该线程自己的IOLoop实例,每个线程的IOLoop也是单例的:
[root@iZj6chejzrsqpclb7miryaZ ~]# cat t.py # coding: utf8 from threading import Thread, Lock from tornado.ioloop import IOLoop lock = Lock() def func(): ioloop = IOLoop() # 在线程中,创建IOLoop实例 with lock: print ioloop.current() # 在线程中,获取线程自己的IOLoop实例 threads = [] for i in range(10): thread = Thread(target=func) thread.start() threads.append(thread) for thread in threads: thread.join() [root@iZj6chejzrsqpclb7miryaZ ~]# python t.py <tornado.platform.epoll.EPollIOLoop object at 0x7fc5c987ed90> <tornado.platform.epoll.EPollIOLoop object at 0x7fc5c987ef90> <tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886190> <tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886350> <tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886510> <tornado.platform.epoll.EPollIOLoop object at 0x7fc5c98866d0> <tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886890> <tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886a50> <tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886c10> <tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886c90>
tornado.stack_context模块[]
tornado.stack_context的源代码在: http://www.tornadoweb.org/en/stable/_modules/tornado/stack_context.html 。
stack_context使用到了Python的两个特性:
- 上下文管理器:可以先阅读一下这篇文档
- 生成器:可以先阅读一下这篇文档
首先看一下,stack_context模块开始部分的代码:
class _State(threading.local):
def __init__(self):
# contexts的第一元表示上下文对象栈,第二元表示栈顶
self.contexts = (tuple(), None)
# !!!线程本地变量_state用来为每个线程保存 上下文对象栈 !!!
_state = _State()
接下来看StackContext类的代码:
# 每个StackContext对象内部会维护三个成员: # * context_factory:调用该成员 会返回 一个上下文管理器对象, # 因此它是一个上下文管理器工厂 # * contexts:该StackContext对象内部维护的上下文管理器列表 # (这些上下文管理器都是通过调用context_factory生成的) # * active:该StackContext对象是否处于激活状态 # StackContext还自定义了一个所谓的“StackContext协议”,该协议可以描述为: # * enter()方法:调用enter()方法时, # StackContext对象会调用context_factory生成一个上下文管理器, # 并将该上下文管理器追加到上下文管理器列表的尾部, # 然后调用该上下文管理器的__enter__方法 # * exit()方法:调用exit()方法时, # StackContext对象会从上下文管理器列表尾部弹出一个上下文管理器, # 然后调用该上下文管理器的__exit__方法 # 因为StackContext类实现了上下文管理器协议。所以它的对象也都是上下文管理器。 # * StackContext对象的__enter__()方法的主要行为是: # * 将当前线程的 当前上下文对象栈 保存起来(用于在弹栈时,恢复上下文对象栈) # * 将自己压入 当前线程的上下文对象栈 的顶部 # * 将新的上下文对象栈也保存起来 # (用于在弹栈,恢复上下文对象栈时, # 检查是否有其他协程,在此期间也修改了上下文对象栈) # * 然后调用该对象的enter()方法, # * 最后返回一个可调用对象:StackContext._deactivate, # 调用该对象会将该StackContext对象设置为非激活状态 # * StackContext对象的__exit__()方法的主要行为是: # * 调用该StackContext上下文对象的exit()方法 # * 并将该StackContext上下文对象从上下文对象栈弹出 # 总结:当进入到StackContext对象时,它会将自己压入 # 当前线程的上下文对象栈的顶部,并将自己设置为栈顶; # 当离开StackContext对象时,它又会将自己从栈顶弹出。 # 使用StackContext需要注意一个问题,[点此]跳转到源代码处。 class StackContext(object): def __init__(self, context_factory): self.context_factory = context_factory self.contexts = [] self.active = True def _deactivate(self): self.active = False # StackContext protocol def enter(self): context = self.context_factory() self.contexts.append(context) context.__enter__() def exit(self, type, value, traceback): context = self.contexts.pop() context.__exit__(type, value, traceback) # Note that some of this code is duplicated in ExceptionStackContext # below. ExceptionStackContext is more common and doesn't need # the full generality of this class. def __enter__(self): self.old_contexts = _state.contexts self.new_contexts = (self.old_contexts[0] + (self,), self) _state.contexts = self.new_contexts try: self.enter() except: _state.contexts = self.old_contexts raise return self._deactivate def __exit__(self, type, value, traceback): try: self.exit(type, value, traceback) finally: final_contexts = _state.contexts _state.contexts = self.old_contexts # Generator coroutines and with-statements with non-local # effects interact badly. Check here for signs of # the stack getting out of sync. # Note that this check comes after restoring _state.context # so that if it fails things are left in a (relatively) # consistent state. # 也就是说,如果在with StackContext(context_factory)语句块 # 内部,使用了yield,就可能会导致 上下文对象栈 的状态不一致。 # [点此]看一个具体的实例。 if final_contexts is not self.new_contexts: raise StackContextInconsistentError( 'stack_context inconsistency (may be caused by yield ' 'within a "with StackContext" block)') # Break up a reference to itself to allow for faster GC on CPython. self.new_contexts = None
在with StackContext语句块中,使用yield,会抛出异常:
from contextlib import contextmanager from tornado.stack_context import StackContext @contextmanager def generator(): yield def func(): with StackContext(generator) as deactive: yield g1 = func() g2 = func() g1.next() g2.next() g1.next()
假设启动g1之前,_state.contexts = ((..., s0), s0);
那么启动g1之后,会生成一个新的StackContext上下文对象s1,并且会将s1入栈,
此时,s1.new_contexts = _state.contexts = ((..., s0, s1), s1),s1.old_contexts = ((..., s0), s0)。
启动g2,又会生成一个新的StackContext对象s2,并且会将s2入栈,
此时,s2.new_contexts = _state.contexts = ((..., s0, s1, s2), s2),s2.old_contexts = ((..., s0, s1), s1)。
再重启g1,会离开with语句块。会执行s1.__exit__()方法,
该方法欲将_state.contexts 恢复为 s1.old_contexts(((..., s0), s0)), 也就是想从当前线程的上下文对象栈中弹出s1,
但是发现在自己入栈、弹栈期间,其他的generator coroutine(g2),已经修改了 上下文对象栈,
此时,就应该抛出异常。假设不抛出异常,而是将_state.contexts更改为((..., s0), s0)。那么再次启动g2之后,又会将_state.contexts置为((..., s0, s1), s1),这与s1已经弹栈相矛盾。
在stack_context模块中,最常用的除了StackContext,就是wrap(fn)函数了。
wrap(fn)函数的执行流程,大致如下:
- 如果fn已经被封装过了,那么直接返回
- 捕获当前线程的当前上下文对象栈,因为_state.contexts是元组类型的,也就是不可变类型的,所以在调用wrap函数之后,对上下文对象栈的修改,都会创建一个新的元组对象,并不会影响在wrap函数中捕获的上下文对象栈
-
如果捕获到的上下文对象栈是空的,那么返回null_wrapper:
- 1,备份当前的上下文对象栈
- 2,将捕获的上下文对象栈 设置为 当前的上下文对象栈
- 3,运行被封装函数
- 4,将上下文对象栈还原到被封装函数运行前的状态
-
否则,返回wrapped:
- 1,备份当前的上下文对象栈
- 2,将捕获的上下文对象栈中非激活的上下文对象移除。点此查看:从上下文栈中,移除非激活上下文的流程
- 3,将捕获的上下文对象栈 设置为 当前的上下文对象栈
-
4,从前到后,按顺序的执行捕获的上下文对象栈中的每一个上下文对象的enter()方法
- 使用top保存最后一个出异常的上下文对象的前一个栈顶,exc保存最后一个异常的信息
-
5, 如果没有出现异常,则:
- 调用回调函数
- 如果回调函数出现异常,将top置为捕获的栈顶,exc保存异常信息
-
6,如果有异常出现(可能是第四步产生的,也可能是第五步产生的),则:
- 从top所保存的上下文对象开始,以逆序的方式,调用每个上下文对象的exit()方法,并且 将异常,在这个链上进行传递
- 7,否则,从栈顶开始,以逆序的方式,调用每个上下文对象的exit()方法,并且将异常,在调用链上传递
- 8,如果有未捕获的异常,则抛出
- 9,将上下文对象栈还原到被封装函数运行前的状态
简单来说就是,以正序的路径,执行每个上下文对象的enter()方法,一直到回调函数,执行完回调函数或中间某步出现异常,则原路返回,执行每个上下文对象的exit()方法,并且将异常信息在调用链上传递。
下面,看一个典型的例子:
[root@iZj6chejzrsqpclb7miryaZ ~]# cat t2.py # coding: utf8 from contextlib import contextmanager from functools import partial from tornado.stack_context import StackContext, wrap, _state from tornado.ioloop import IOLoop ioloop = IOLoop.current() @contextmanager def context_factory(): try: print "enter" yield print "exit" except Exception as ex: print "error accurs: %s" % str(ex) print def func(callback, *a, **kw): # wrap函数会捕获当前的上下文对象栈, # 即使之后对上下文对象栈进行改变,也不会影响wrap中捕获的上下文。 # 同时,wrap执行时会移除非激活上下文 ioloop.add_callback(wrap(partial(callback, *a, **kw))) def callback(): try: print "=== start callback ===" print "in callback: _state.contexts = ", _state.contexts raise RuntimeError("in callback") finally: ioloop.stop() print "=== end callback ===" with StackContext(context_factory): with StackContext(context_factory) as deactive: #进入了两个上下文对象,此时上下文对象栈中有两个上下文对象 print "in main thread: _state.contexts = ", _state.contexts # 进入到func函数 func(callback) deactive() # 离开两个上下文对象,此时这两个对象都从栈中弹出 print "in main thread: _state.contexts = ", _state.contexts print ioloop.start() [root@iZj6chejzrsqpclb7miryaZ ~]# python t2.py enter enter in main thread: _state.contexts = ((<tornado.stack_context.StackContext object at 0x7f66c2b4fc90>, <tornado.stack_context.StackContext object at 0x7f66c2b4fd10>), <tornado.stack_context.StackContext object at 0x7f66c2b4fd10>) exit exit in main thread: _state.contexts = ((), None) enter === start callback === in callback: _state.contexts = ((<tornado.stack_context.StackContext object at 0x7f66c2b4fc90>,), <tornado.stack_context.StackContext object at 0x7f66c2b4fc90>) === end callback === error accurs: in callback ERROR:tornado.application:Exception in callbackTraceback (most recent call last): File "/usr/lib64/python2.7/site-packages/tornado/ioloop.py", line 605, in _run_callback ret = callback() File "/usr/lib64/python2.7/site-packages/tornado/stack_context.py", line 251, in wrapped raise_exc_info(exc) File "/usr/lib64/python2.7/site-packages/tornado/stack_context.py", line 222, in wrapped ret = fn(*args, **kwargs) File "t2.py", line 26, in callback raise RuntimeError("in callback") RuntimeError: in callback
总结来说就是:通过with StackContext语句对上下文对象栈进行修改和还原,在with的语句体中通过wrap对当时的上下文对象栈进行捕获。
tornado.concurrent.Future[]
因篇幅有限,已经迁移到:这里。
因篇幅有限,已经迁移到:这里。
因篇幅有限,已经迁移到:这里。
tornado.iostream[]
因篇幅有限,已经迁移到:这里。
因篇幅有限,已经迁移到:这里。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Hard Thing About Hard Things
Ben Horowitz / HarperBusiness / 2014-3-4 / USD 29.99
Ben Horowitz, cofounder of Andreessen Horowitz and one of Silicon Valley's most respected and experienced entrepreneurs, offers essential advice on building and running a startup—practical wisdom for ......一起来看看 《The Hard Thing About Hard Things》 这本书的介绍吧!
HTML 编码/解码
HTML 编码/解码
RGB CMYK 转换工具
RGB CMYK 互转工具