tornado源码解析

栏目: Python · 发布时间: 6年前

内容简介:tornado源码解析

目录

  • 本文在CentOS Linux release 7.3.1611 (Core)、 Python 2.7.5、tornado 4.5.3下测试通过
  • 建议有一定的tornado使用基础,再阅读本文;否则,可以先看。
  • 关于IO多路复用机制,可以参考:这篇文档
  • 关于阻塞socket 和 非阻塞socket,可以参考:这篇文档
  • 阻塞调用 和 非阻塞调用
    • 阻塞调用是指 函数在获取到返回结果之前,会将当前线程挂起
    • 非阻塞调用是指 无论函数是否能立即获取到返回结果,它都会立即返回
  • 同步 和 异步
    • 同步是指 在有事件发生时,由程序自己负责读写
    • 异步是指 当发生关注的事件时,由异步IO实现负责将数据从内核空间读到用户空间

tornado.util.Configurable类[]

源代码在: http://www.tornadoweb.org/en/stable/_modules/tornado/util.html#Configurable

在阅读Configurable的源码前,可以先看下这篇文档: Python中的类、构造方法、元类

Configurable的核心是其构造方法: __new__它是 其实现子类的 工厂方法

在新式类中,如果子类没有定义 __new__ 方法,那么会使用父类的该方法,来创建对象。因此,Configurable的 未定义 __new__ 方法的子类(比如IOLoop等) 都会使用Configurable的构造方法。

下面看一下Configurable的主要代码:

class Configurable(object):
    __impl_class = None  # type: type
    __impl_kwargs = None  # type: Dict[str, Any]

    def __new__(cls, *args, **kwargs):
        base = cls.configurable_base()
        init_kwargs = {}

        # 如果子类的配置基类就是子类本身,
        # + 那么:
        # + + 如果配置基类通过__impl_class属性指定了实现类,则使用它;
        # + + 否则,使用子类的configurable_default()方法返回的实现类。
        if cls is base:
            impl = cls.configured_class()
            if base.__impl_kwargs:
                init_kwargs.update(base.__impl_kwargs)
        # 如果子类的配置基类不是自身,直接使用子类作为实现类。
        else:
            impl = cls
        init_kwargs.update(kwargs)

        # 创建实现类的实例
        instance = super(Configurable, cls).__new__(impl)
        # initialize vs __init__ chosen for compatibility with AsyncHTTPClient
        # singleton magic.  If we get rid of that we can switch to __init__
        # here too.

        # 使用传递给构造方法的参数,调用实例的initialize方法,进行初始化。
        # 实例的__init__方法也会被自动调用,但是一般不应该使用__init__方法进行初始化,而是建议使用initialize方法
        instance.initialize(*args, **init_kwargs)
        return instance

    # configurable_base()方法用于返回子类的配置基类。通常,子类的配置基类就是其自身(但是不是必须)。
    # issubclass(子类, 配置基类) == True。
    # 该方法需要在子类中实现。
    @classmethod
    def configurable_base(cls):
        # type: () -> Any
        # TODO: This class needs https://github.com/python/typing/issues/107
        # to be fully typeable.
        """Returns the base class of a configurable hierarchy.

        This will normally return the class in which it is defined.
        (which is *not* necessarily the same as the cls classmethod parameter).
        """
        raise NotImplementedError()

    # configured_class()方法的作用是:
    # + 如果子类的__impl_class属性是None(也就是,其配置基类没有通过__impl_class属性指定实现类),
    # + 那么,则使用它的configurable_default()方法返回的类作为实现类,并将其保存到配置基类的__impl_class属性中;
    # + 否则,直接使用配置基类的__impl_class属性指定的实现类。
    # 实现类是子类自身或其子类。
    @classmethod
    def configured_class(cls):
        # type: () -> type
        """Returns the currently configured class."""
        base = cls.configurable_base()
        if cls.__impl_class is None:
            base.__impl_class = cls.configurable_default()
        return base.__impl_class

    # configure()方法用于在运行时,为Configurable的子类指定实现类,以及初始化时使用的参数。
    # + 它们会被保存到 子类的配置基类的 __impl_class 和 __impl_kwargs属性中
    @classmethod
    def configure(cls, impl, **kwargs):
        # type: (Any, **Any) -> None
        """Sets the class to use when the base class is instantiated.

        Keyword arguments will be saved and added to the arguments passed
        to the constructor.  This can be used to set global defaults for
        some parameters.
        """
        base = cls.configurable_base()
        if isinstance(impl, (str, unicode_type)):
            impl = import_object(impl)
        if impl is not None and not issubclass(impl, cls):
            raise ValueError("Invalid subclass of %s" % cls)
        base.__impl_class = impl
        base.__impl_kwargs = kwargs

总结:

继承Configurable的子类,需要实现下列的方法:

  • configurable_base(cls):通常返回子类自身
  • configurable_default(cls):返回默认的实现类,比如根据不同的操作系统平台,返回不同的实现类

子类的实现类,需要实现:

  • initialize(self, *a, **kw):用于实例的初始化

tornado.ioloop.IOLoop 就是Configurable子类。下面看看其中关于对象创建的关键代码:

class IOLoop(Configurable):
    # Global lock for creating global IOLoop instance
    _instance_lock = threading.Lock()

    _current = threading.local()

    # 返回一个全局的IOLoop实例,
    大多数应用程序是在主线程上运行一个单例的、全局的IOLoop实例。
    大多数情形下,使用该方法获取IOLoop实例 比 使用current()方法获取当前线程的IOLoop实例要好。
    @staticmethod
    def instance():
        """Returns a global `IOLoop` instance.

        Most applications have a single, global `IOLoop` running on the
        main thread.  Use this method to get this instance from
        another thread.  In most other cases, it is better to use `current()`
        to get the current thread's `IOLoop`.
        """
        if not hasattr(IOLoop, "_instance"):
            with IOLoop._instance_lock:
                if not hasattr(IOLoop, "_instance"):
                    # New instance after double check
                    
                    IOLoop._instance = IOLoop()
                    # 因为IOLoop继承了Configurable,并且IOLoop自己没有定义__new__方法,所以会调用Configurable的__new__方法,创建实例。
                    # IOLoop的configurable_base()方法,返回的是自身,并且没有通过__impl_class指定实现类。
                    # 所以会使用IOLoop.configurable_default()方法返回的类作为实现类。
                    # 首先应该知道的是:IOLoop本质上是封装了操作系统的IO多路复用机制,不同的操作系统提供了不同的IO多路复用机制,
                    # 比如 linux 提供了epoll机制,bsd和mac提供了kqueue机制,windows提供了select机制。
                    # 在tornado中,每种机制都对应一个具体的IOLoop子类,比如linux的epoll对应tornado.platform.epoll.EPollIOLoop类。
                    # IOLoop.configurable_default()方法完成的任务就是根据操作系统平台选择其对应的实现类。
                    # 所以,开发人员不必自己根据操作系统选择实现类,这个过程是自动的。
                    # 在创建了实现类的对象之后,就会调用它的intialize()方法进行初始化。以EPollIOLoop为例进行说明:
                    # EPollIOLoop的initialize方法会调用其父类PollIOLoop的initialize()方法,
                    # 并且将impl参数指定为select.epoll()(也就是使用epoll机制)
                    # PollIOLoop的initalize()方法,又会调用IOLoop的initialize()方法。
                    # IOLoop的initialize()方法,会先调用IOLoop.current(instance=False)方法,
                    # 如果当前线程上,已经有一个IOLoop实例,那么IOLoop.current(instance=False)方法会返回该实例,否则返回None。
                    # 当当前线程已经有一个IOLoop实例的时候,IOLoop.initialize()方法,会根据make_current参数,来决定:不做任何处理或抛出异常。
                    # 如果当前线程没有IOLoop实例,那么则将新建的这个IOLoop实例,保存到IOLoop._current这个线程本地变量中。
                    # 其实,IOLoop.current()方法也是从IOLoop._current这个线程本地变量,获取当前线程的IOLoop实例的。
                    # 这里用到了ThreadLocal,关于ThreadLocal可以看这篇文档。
        return IOLoop._instance

    # 如果当前线程有自己的IOLoop实例,则返回它;否则,
    # 如果instance参数是True,返回全局的IOLoop实例;
    # 如果instance参数是False,返回None。
    @staticmethod
    def current(instance=True):
        current = getattr(IOLoop._current, "instance", None)
        if current is None and instance:
            return IOLoop.instance()
        return current

    def make_current(self):
        IOLoop._current.instance = self

    @classmethod
    def configurable_base(cls):
        return IOLoop

    @classmethod
    def configurable_default(cls):
        if hasattr(select, "epoll"):
            from tornado.platform.epoll import EPollIOLoop
            return EPollIOLoop
        if hasattr(select, "kqueue"):
            # Python 2.6+ on BSD or Mac
            from tornado.platform.kqueue import KQueueIOLoop
            return KQueueIOLoop
        from tornado.platform.select import SelectIOLoop
        return SelectIOLoop

    def initialize(self, make_current=None):
        if make_current is None:
            if IOLoop.current(instance=False) is None:
                self.make_current()
        elif make_current:
            if IOLoop.current(instance=False) is not None:
                raise RuntimeError("current IOLoop already exists")
            self.make_current()

使用IOLoop有两种方式:

  • 1,通过IOLoop.instance()获取一个 全局的、单例的 IOLoop实例
  • 2,在线程中创建IOLoop实例,然后通过IOLoop.current()获取该线程自己的IOLoop实例,每个线程的IOLoop也是单例的:
[root@iZj6chejzrsqpclb7miryaZ ~]# cat t.py 
# coding: utf8
from threading import Thread, Lock

from tornado.ioloop import IOLoop

lock = Lock()

def func():
    ioloop = IOLoop() # 在线程中,创建IOLoop实例
    with lock:
        print ioloop.current() # 在线程中,获取线程自己的IOLoop实例

threads = []
for i in range(10):
    thread = Thread(target=func)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

[root@iZj6chejzrsqpclb7miryaZ ~]# python t.py 
<tornado.platform.epoll.EPollIOLoop object at 0x7fc5c987ed90>
<tornado.platform.epoll.EPollIOLoop object at 0x7fc5c987ef90>
<tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886190>
<tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886350>
<tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886510>
<tornado.platform.epoll.EPollIOLoop object at 0x7fc5c98866d0>
<tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886890>
<tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886a50>
<tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886c10>
<tornado.platform.epoll.EPollIOLoop object at 0x7fc5c9886c90>

tornado.stack_context模块[]

tornado.stack_context的源代码在: http://www.tornadoweb.org/en/stable/_modules/tornado/stack_context.html

stack_context使用到了Python的两个特性:

  • 上下文管理器:可以先阅读一下这篇文档
  • 生成器:可以先阅读一下这篇文档

首先看一下,stack_context模块开始部分的代码:

class _State(threading.local):
    def __init__(self):
        # contexts的第一元表示上下文对象栈,第二元表示栈顶
        self.contexts = (tuple(), None)

# !!!线程本地变量_state用来为每个线程保存 上下文对象栈 !!!
_state = _State()

接下来看StackContext类的代码:

# 每个StackContext对象内部会维护三个成员:
# * context_factory:调用该成员 会返回 一个上下文管理器对象,
#    因此它是一个上下文管理器工厂
# * contexts:该StackContext对象内部维护的上下文管理器列表
#    (这些上下文管理器都是通过调用context_factory生成的)
# * active:该StackContext对象是否处于激活状态

# StackContext还自定义了一个所谓的“StackContext协议”,该协议可以描述为:
# * enter()方法:调用enter()方法时,
#        StackContext对象会调用context_factory生成一个上下文管理器,
#        并将该上下文管理器追加到上下文管理器列表的尾部,
#        然后调用该上下文管理器的__enter__方法
# * exit()方法:调用exit()方法时,
#        StackContext对象会从上下文管理器列表尾部弹出一个上下文管理器,
#        然后调用该上下文管理器的__exit__方法

# 因为StackContext类实现了上下文管理器协议。所以它的对象也都是上下文管理器。
# * StackContext对象的__enter__()方法的主要行为是:
#        * 将当前线程的 当前上下文对象栈 保存起来(用于在弹栈时,恢复上下文对象栈)
#        * 将自己压入 当前线程的上下文对象栈 的顶部
#        * 将新的上下文对象栈也保存起来
#            (用于在弹栈,恢复上下文对象栈时,
#                 检查是否有其他协程,在此期间也修改了上下文对象栈)
#        * 然后调用该对象的enter()方法,
#        * 最后返回一个可调用对象:StackContext._deactivate,
#            调用该对象会将该StackContext对象设置为非激活状态
# * StackContext对象的__exit__()方法的主要行为是:
#        * 调用该StackContext上下文对象的exit()方法
#        * 并将该StackContext上下文对象从上下文对象栈弹出

# 总结:当进入到StackContext对象时,它会将自己压入
#    当前线程的上下文对象栈的顶部,并将自己设置为栈顶;
#    当离开StackContext对象时,它又会将自己从栈顶弹出。
# 使用StackContext需要注意一个问题,[点此]跳转到源代码处。

class StackContext(object):
    def __init__(self, context_factory):
        self.context_factory = context_factory
        self.contexts = []
        self.active = True

    def _deactivate(self):
        self.active = False

    # StackContext protocol
    def enter(self):
        context = self.context_factory()
        self.contexts.append(context)
        context.__enter__()

    def exit(self, type, value, traceback):
        context = self.contexts.pop()
        context.__exit__(type, value, traceback)

    # Note that some of this code is duplicated in ExceptionStackContext
    # below.  ExceptionStackContext is more common and doesn't need
    # the full generality of this class.
    def __enter__(self):
        self.old_contexts = _state.contexts
        self.new_contexts = (self.old_contexts[0] + (self,), self)
        _state.contexts = self.new_contexts

        try:
            self.enter()
        except:
            _state.contexts = self.old_contexts
            raise

        return self._deactivate

    def __exit__(self, type, value, traceback):
        try:
            self.exit(type, value, traceback)
        finally:
            final_contexts = _state.contexts
            _state.contexts = self.old_contexts

            # Generator coroutines and with-statements with non-local
            # effects interact badly.  Check here for signs of
            # the stack getting out of sync.
            # Note that this check comes after restoring _state.context
            # so that if it fails things are left in a (relatively)
            # consistent state.

            # 也就是说,如果在with StackContext(context_factory)语句块
            #    内部,使用了yield,就可能会导致 上下文对象栈 的状态不一致。
            #    [点此]看一个具体的实例。

            if final_contexts is not self.new_contexts:
                raise StackContextInconsistentError(
                    'stack_context inconsistency (may be caused by yield '
                    'within a "with StackContext" block)')

            # Break up a reference to itself to allow for faster GC on CPython.
            self.new_contexts = None

在with StackContext语句块中,使用yield,会抛出异常:

from contextlib import contextmanager

from tornado.stack_context import StackContext

@contextmanager
def generator():
    yield

def func():
    with StackContext(generator) as deactive:
        yield

g1 = func()
g2 = func()

g1.next()
g2.next()
g1.next()

假设启动g1之前,_state.contexts = ((..., s0), s0);

那么启动g1之后,会生成一个新的StackContext上下文对象s1,并且会将s1入栈,

此时,s1.new_contexts = _state.contexts = ((..., s0, s1), s1),s1.old_contexts = ((..., s0), s0)。

启动g2,又会生成一个新的StackContext对象s2,并且会将s2入栈,

此时,s2.new_contexts = _state.contexts = ((..., s0, s1, s2), s2),s2.old_contexts = ((..., s0, s1), s1)。

再重启g1,会离开with语句块。会执行s1.__exit__()方法,

该方法欲将_state.contexts 恢复为 s1.old_contexts(((..., s0), s0)), 也就是想从当前线程的上下文对象栈中弹出s1,

但是发现在自己入栈、弹栈期间,其他的generator coroutine(g2),已经修改了 上下文对象栈,

此时,就应该抛出异常。假设不抛出异常,而是将_state.contexts更改为((..., s0), s0)。那么再次启动g2之后,又会将_state.contexts置为((..., s0, s1), s1),这与s1已经弹栈相矛盾。

在stack_context模块中,最常用的除了StackContext,就是wrap(fn)函数了。

wrap(fn)函数的执行流程,大致如下:

  • 如果fn已经被封装过了,那么直接返回
  • 捕获当前线程的当前上下文对象栈,因为_state.contexts是元组类型的,也就是不可变类型的,所以在调用wrap函数之后,对上下文对象栈的修改,都会创建一个新的元组对象,并不会影响在wrap函数中捕获的上下文对象栈
  • 如果捕获到的上下文对象栈是空的,那么返回null_wrapper:
    • 1,备份当前的上下文对象栈
    • 2,将捕获的上下文对象栈 设置为 当前的上下文对象栈
    • 3,运行被封装函数
    • 4,将上下文对象栈还原到被封装函数运行前的状态
  • 否则,返回wrapped:
    • 1,备份当前的上下文对象栈
    • 2,将捕获的上下文对象栈中非激活的上下文对象移除。点此查看:从上下文栈中,移除非激活上下文的流程
    • 3,将捕获的上下文对象栈 设置为 当前的上下文对象栈
    • 4,从前到后,按顺序的执行捕获的上下文对象栈中的每一个上下文对象的enter()方法
      • 使用top保存最后一个出异常的上下文对象的前一个栈顶,exc保存最后一个异常的信息
    • 5, 如果没有出现异常,则:
      • 调用回调函数
      • 如果回调函数出现异常,将top置为捕获的栈顶,exc保存异常信息
    • 6,如果有异常出现(可能是第四步产生的,也可能是第五步产生的),则:
      • 从top所保存的上下文对象开始,以逆序的方式,调用每个上下文对象的exit()方法,并且 将异常,在这个链上进行传递
    • 7,否则,从栈顶开始,以逆序的方式,调用每个上下文对象的exit()方法,并且将异常,在调用链上传递
    • 8,如果有未捕获的异常,则抛出
    • 9,将上下文对象栈还原到被封装函数运行前的状态

简单来说就是,以正序的路径,执行每个上下文对象的enter()方法,一直到回调函数,执行完回调函数或中间某步出现异常,则原路返回,执行每个上下文对象的exit()方法,并且将异常信息在调用链上传递。

下面,看一个典型的例子:

[root@iZj6chejzrsqpclb7miryaZ ~]# cat t2.py 
# coding: utf8

from contextlib import contextmanager
from functools import partial

from tornado.stack_context import StackContext, wrap, _state
from tornado.ioloop import IOLoop

ioloop = IOLoop.current()

@contextmanager
def context_factory():
    try:
        print "enter"
        yield
        print "exit"
    except Exception as ex:
        print "error accurs: %s" % str(ex)
        print

def func(callback, *a, **kw):
    # wrap函数会捕获当前的上下文对象栈,
    # 即使之后对上下文对象栈进行改变,也不会影响wrap中捕获的上下文。
    # 同时,wrap执行时会移除非激活上下文
    ioloop.add_callback(wrap(partial(callback, *a, **kw)))

def callback():
    try:
        print "=== start callback ==="
        print "in callback: _state.contexts = ", _state.contexts
        raise RuntimeError("in callback")
    finally:
        ioloop.stop()
        print "=== end callback ==="

with StackContext(context_factory):
    with StackContext(context_factory) as deactive:
        #进入了两个上下文对象,此时上下文对象栈中有两个上下文对象
        print "in main thread: _state.contexts = ", _state.contexts
        # 进入到func函数
        func(callback)
    deactive()
# 离开两个上下文对象,此时这两个对象都从栈中弹出
print "in main thread: _state.contexts = ", _state.contexts
print

ioloop.start()

[root@iZj6chejzrsqpclb7miryaZ ~]# python t2.py 
enter
enter
in main thread: _state.contexts =  ((<tornado.stack_context.StackContext object at 0x7f66c2b4fc90>, <tornado.stack_context.StackContext object at 0x7f66c2b4fd10>), <tornado.stack_context.StackContext object at 0x7f66c2b4fd10>)
exit
exit
in main thread: _state.contexts =  ((), None)

enter
=== start callback ===
in callback: _state.contexts =  ((<tornado.stack_context.StackContext object at 0x7f66c2b4fc90>,), <tornado.stack_context.StackContext object at 0x7f66c2b4fc90>)
=== end callback ===
error accurs: in callback

ERROR:tornado.application:Exception in callback 

 
Traceback (most recent call last):
  File "/usr/lib64/python2.7/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/usr/lib64/python2.7/site-packages/tornado/stack_context.py", line 251, in wrapped
    raise_exc_info(exc)
  File "/usr/lib64/python2.7/site-packages/tornado/stack_context.py", line 222, in wrapped
    ret = fn(*args, **kwargs)
  File "t2.py", line 26, in callback
    raise RuntimeError("in callback")
RuntimeError: in callback

总结来说就是:通过with StackContext语句对上下文对象栈进行修改和还原,在with的语句体中通过wrap对当时的上下文对象栈进行捕获。

tornado.concurrent.Future[]

因篇幅有限,已经迁移到:这里。

因篇幅有限,已经迁移到:这里。

因篇幅有限,已经迁移到:这里。

tornado.iostream[]

因篇幅有限,已经迁移到:这里。

因篇幅有限,已经迁移到:这里。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Hard Thing About Hard Things

The Hard Thing About Hard Things

Ben Horowitz / HarperBusiness / 2014-3-4 / USD 29.99

Ben Horowitz, cofounder of Andreessen Horowitz and one of Silicon Valley's most respected and experienced entrepreneurs, offers essential advice on building and running a startup—practical wisdom for ......一起来看看 《The Hard Thing About Hard Things》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具