uwsgi源码分析

栏目: Python · 发布时间: 6年前

内容简介:uwsgi源码分析

目录

本文在CentOS Linux release 7.3.1611 (Core)、uwsgi 2.0.15、 Python 2.7.5下测试通过。

源码分析 []

core/uwsgi.c:

#ifdef UWSGI_AS_SHARED_LIBRARY
int uwsgi_init(int argc, char *argv[], char *envp[]) {
#else
int main(int argc, char *argv[], char *envp[]) {
#endif
    uwsgi_setup(argc, argv, envp);
    return uwsgi_run();
}

2,uwsgi_setup()函数的主要代码[]

core/uwsgi.c:

void uwsgi_setup(int argc, char *argv[], char *envp[]) {
    ....
    // !!! 初始化内嵌插件 !!!
    //initialize embedded plugins
    UWSGI_LOAD_EMBEDDED_PLUGINS
    ....
    uwsgi_start((void *) uwsgi.argv);
}

在执行 sudo python setup.py install 安装uwsgi的时候,可以看到:

configured CFLAGS: ... -D<strong>UWSGI_LOAD_EMBEDDED_PLUGINS</strong>="ULEP(python);ULEP(gevent);ULEP(ping);ULEP(cache);ULEP(nagios);ULEP(rrdtool);ULEP(carbon);ULEP(rpc);ULEP(corerouter);ULEP(fastrouter);ULEP(http);ULEP(ugreen);ULEP(signal);ULEP(syslog);ULEP(rsyslog);ULEP(logsocket);ULEP(router_uwsgi);ULEP(router_redirect);ULEP(router_basicauth);ULEP(zergpool);ULEP(redislog);ULEP(mongodblog);ULEP(router_rewrite);ULEP(router_http);ULEP(logfile);ULEP(router_cache);ULEP(rawrouter);ULEP(router_static);ULEP(sslrouter);ULEP(spooler);ULEP(cheaper_busyness);ULEP(symcall);ULEP(transformation_tofile);ULEP(transformation_gzip);ULEP(transformation_chunked);ULEP(transformation_offload);ULEP(router_memcached);ULEP(router_redis);ULEP(router_hash);ULEP(router_expires);ULEP(router_metrics);ULEP(transformation_template);ULEP(stats_pusher_socket);"

接下来,在uwsgi.h头文件中,找到ULEP这个宏:

#define ULEP(pname)\
    if (pname##_plugin.request) {\
    uwsgi.p[pname##_plugin.modifier1] = &pname##_plugin;\
    if (uwsgi.p[pname##_plugin.modifier1]->on_load) {\
        uwsgi.p[pname##_plugin.modifier1]->on_load();}\
    }\
    else {\
    if (uwsgi.gp_cnt >= MAX_GENERIC_PLUGINS) {\
        uwsgi_log("you have embedded too much generic plugins !!!\n");\
        exit(1);\
    }\
    uwsgi.gp[uwsgi.gp_cnt] = &pname##_plugin;\
    if (uwsgi.gp[uwsgi.gp_cnt]->on_load)\
        uwsgi.gp[uwsgi.gp_cnt]->on_load();\
    uwsgi.gp_cnt++;\
    }\

可以看到,所有具有request属性的插件,会被当作请求插件,保存到 uwsgi.p[] 数组中(注意:每个插件对象在数组中的索引值是其modifier1属性的值);

其它的插件会被当作通用插件,保存到 uwsgi.gp[] 数组中。

同时,会调用所有插件的on_load钩子。

默认情况下,python是内嵌的请求插件。

uwsgi_setup 中最重要的工作就是把插件都装配好了 。

3,uwsgi_start()函数的主要代码[]

uwsgi_setup() 函数最后调用了 uwsgi_start()uwsgi_start() 函数用于完成各种 初始化工作

core/uwsgi.c:

int uwsgi_start(void *v_argv) {
    ...

    // !!!初始化socket协议!!!
    // initialize socket protocols (do it after caching !!!)
    uwsgi_protocols_register();
    ...

    // !!! 绑定所有未绑定的socket !!!
    uwsgi_bind_sockets();
    
    // put listening socket in non-blocking state and set the protocol
    uwsgi_set_sockets_protocols();
    ...
    
    // initialize request plugin only if workers or master are available
    if (uwsgi.sockets || uwsgi.master_process || uwsgi.no_server || uwsgi.command_mode || uwsgi.loop) {
        for (i = 0; i < 256; i++) {
            if (uwsgi.p[i]->init) {
                uwsgi.p[i]->init();
            }    
        }    
    }
    ...

    if (uwsgi.has_threads) {
        ...
        // again check for workers/sockets...
        if (uwsgi.sockets || uwsgi.master_process || uwsgi.no_server || uwsgi.command_mode || uwsgi.loop) {
            for (i = 0; i < 256; i++) {
                if (uwsgi.p[i]->enable_threads)
                    uwsgi.p[i]->enable_threads();
            }
        }
    }
    ...

    //init apps hook (if not lazy)
    if (!uwsgi.lazy && !uwsgi.lazy_apps) {
        uwsgi_init_all_apps();
    }
    ...


    if (!uwsgi.status.is_cheap) {
        if (uwsgi.cheaper && uwsgi.cheaper_count) {
            int nproc = uwsgi.cheaper_initial;
            if (!nproc)
                nproc = uwsgi.cheaper_count;
            for (i = 1; i <= uwsgi.numproc; i++) {
                if (i <= nproc) {
                    if (uwsgi_respawn_worker(i))
                        break;
                    uwsgi.respawn_delta = uwsgi_now();
                }    
                else {
                    uwsgi.workers[i].cheaped = 1; 
                }    
            }    
        } 
    ...
}

uwsgi_start() 函数,主要做以下初始化工作:

  • 调用 uwsgi_protocols_register() ,初始化socket协议。最终会把uwsgi支持的所有协议,保存到 uwsgi.protocols 链表中。在绑定完所有的socket之后,会将相应的回调设置给每个socket

  • uwsgi_bind_sockets() 函数中,对每个socket,调用了 bind_to_tcp(uwsgi_sock->name, uwsgi.listen_queue, tcp_port) (core/socket.c), bind_to_tcp() 函数在按需执行各种 setsockopt 之后,执行bind,listen

    至此,master进程监听了所有的socket。

  • 调用所有请求插件的init钩子。默认情况下,python请求插件已经被装配了,接下来看,python插件的init方法都做了什么:

    plugins/python/python_plugin.c:

struct uwsgi_plugin python_plugin = {
    .name = "python",
    .alias = "python",
    .modifier1 = 0,
    <strong>.init = uwsgi_python_init</strong>,
    .enable_threads = uwsgi_python_enable_threads,
    .init_apps = uwsgi_python_init_apps,
    ...
}

uwsgi_python_init() 函数中,最主要的动作就是调用了Python的C API Py_Initialize() 创建了Python的 虚拟机实例 ,同时通过 PyThreadState_Get() 获取了主线程的ThreadState对象,并将其保存到全局变量 uwsgi_python up

至此,Python虚拟机实例就被创建了

  • 根据需要调用请求插件的enable_threads钩子。下面是 uwsgi_python_enable_threads() 的主要代码:
void uwsgi_python_enable_threads() {
    ...

    PyEval_InitThreads(); // 开启多线程支持,这条语句也会导致主进程的主线程获取到GIL,并且在该函数中,没有释放
    ...

    // 将ThreadState对象保存到ThreadLocal变量中,
    // 在Python中每个线程对应一个ThreadState对象
    pthread_setspecific(up.upt_gil_key, (void *) PyThreadState_Get());
    ...


    // 将设置获取和释放GIL的方法保存到全局变量up中
    up.gil_get = gil_real_get;
    up.gil_release = gil_real_release;
    ...
}

上面的流程其实是C/C++中多线程调用Python的基本流程。

  • 在非lazy模式下, uwsgi_start() 函数还会调用请求插件的init_apps钩子。下面看python插件的init_apps钩子 uwsgi_python_init_apps 都做了什么:
void uwsgi_python_init_apps() {
    // lazy ?
    if (uwsgi.mywid > 0) {
        UWSGI_GET_GIL; // 如果是延迟加载(也就是在子进程中加载),则获取GIL
    }
    ...

    // setup app loaders
    ...

    if (up.file_config != NULL) {
        init_uwsgi_app(LOADER_FILE, up.file_config, uwsgi.wsgi_req, up.main_thread, PYTHON_APP_TYPE_WSGI);
    }
    ...

    // lazy ?
    if (uwsgi.mywid > 0) {
        UWSGI_RELEASE_GIL;
    } 
}

下面看 init_uwsgi_app() 函数:

plugins/python/pyloader.c:

int init_uwsgi_app(int loader, void *arg1, struct wsgi_request *wsgi_req, PyThreadState *interpreter, int app_type) {
    ...

    // !!! 创建uwsgi_app对象 !!!
    struct uwsgi_app *wi;
    ...

    // !!! 将该uwsgi_app对象保存到数组uwsgi.workers[uwsgi.mywid].apps !!!
    wi = &uwsgi_apps[id];
    ...

    // !!! 设置modifier1 !!!
    wi->modifier1 = python_plugin.modifier1;
    ...

    // !!! <strong>加载WSGI Application</strong> !!!
    wi->callable = up.loaders[loader](arg1);
    ...

    // !!! 设置request 和 response handler !!!
    if (app_type == PYTHON_APP_TYPE_WSGI) {
        wi->request_subhandler = uwsgi_request_subhandler_wsgi;
        wi->response_subhandler = uwsgi_response_subhandler_wsgi;
        wi->argc = 2;
    }
    ...
}

至此,WSGI application已经被导入了。

  • uwsgi_start() 最后生成指定数量的worker进程,并进行一些列的设置,其中包括为每一个线程(core)设置一个wsgi_request结构体

至此,整个的初始化过程就完成了。

4,uwsgi_run()函数的主要代码[]

uwsgi_start() 完成之后,就会进入到 uwsgi_run() 函数:

core/uwsgi.c:

int uwsgi_run() {

    // !!! master进程进入master循环,master负责监控worker状态,在worker crash的时候,负责重新spawn<strong>等</strong>工作!!!
    // !!! from now on, we could be in the master or in a worker !!!
    int i;

    if (getpid() == masterpid && uwsgi.master_process == 1) { 
#ifdef UWSGI_AS_SHARED_LIBRARY
        int ml_ret = master_loop(uwsgi.argv, uwsgi.environ);
        if (ml_ret == -1) {
            return 0;
        }    
#else
        uwsgi_log("=== here I am ===\n");
        (void) master_loop(uwsgi.argv, uwsgi.environ);
#endif
        //from now on the process is a real worker
    }    
    ...

    // !!!worker进入worker循环!!!
    uwsgi_worker_run();
    _exit(0);
}

我们不看master循环了,只看worker循环。

接下来进入到 uwsgi_worker_run() 函数:

void uwsgi_worker_run() {
    ...

    uwsgi_ignition();

    // never here
    exit(0);

}

然后进入到 uwsgi_ignition() 函数:

void uwsgi_ignition() {
    ...

    else {
        if (uwsgi.async < 2) {
            <strong>simple_loop()</strong>;
        }
        else {
            async_loop();
        }
    }

    // end of the process...
    end_me(0);
}

我们只看 simple_loop() 函数的源代码。

5,simple_loop()函数的主要代码[]

core/loop.c:

void simple_loop() {
    uwsgi_loop_cores_run(simple_loop_run);
}

void uwsgi_loop_cores_run(void *(*func) (void *)) {
    int i;
    for (i = 1; i < uwsgi.threads; i++) {
        long j = i;
        pthread_create(&uwsgi.workers[uwsgi.mywid].cores[i].thread_id, &uwsgi.threads_attr, func, (void *) j); 
    }
    long y = 0;
    func((void *) y); 
}

这段代码表明,uwsgi开启了 uwsgi.threads (包括主线程)个 操作系统线程 ,并发的运行 simple_loop_run() 函数:

void *simple_loop_run(void *arg1) {
    ...

    // !!!在uwsgi_setup_thread_req()函数中调用了所有请求插件的init_thread钩子。!!!
    // !!!python插件的uwsgi_python_init_thread()方法,为线程创建了一个ThreadState对象,并保存到Thread Local中,后续的GET_GIL 和 RELEASE_GIL都会用到这些ThreadState状态,这是C/C++多线程调用Python的约定!!!
    if (uwsgi.threads > 1) {
        uwsgi_setup_thread_req(core_id, wsgi_req);
    }
    ...

    // 下面的代码就是和操作系统平台相关的了,因为不同的平台实现了不同的IO多路复用机制,笔者主要就epoll进行说明
    // !!!创建一个用于轮询的epoll描述符!!!
    // initialize the main event queue to monitor sockets
    int main_queue = event_queue_init();

    // !!!向epoll描述符中,添加事件!!!
    uwsgi_add_sockets_to_queue(main_queue, core_id);
    ...

    // ok we are ready, let's start managing requests and signals
    while (uwsgi.workers[uwsgi.mywid].manage_next_request) {

        // !!!初始化wsgi_req对象!!!
        wsgi_req_setup(wsgi_req, core_id, NULL);

        // !!!该函数的详细说明在下面!!!
        if (wsgi_req_accept(main_queue, wsgi_req)) {
            continue;
        }   

        // !!!该函数的详细说明在下面!!!
        if (wsgi_req_recv(main_queue, wsgi_req)) {
            uwsgi_destroy_request(wsgi_req);
            continue;
        }   

        // !!!关闭请求,释放请求中的资源!!!
        uwsgi_close_request(wsgi_req);
    } 
    ...

}

下面是被 simple_loop_run() 函数所调用的一些(和epoll相关的)代码的细节:

core/event.c:

int event_queue_init() {
    int epfd;


    epfd = epoll_create(256);

    if (epfd < 0) { 
        uwsgi_error("epoll_create()");
        return -1;
    }    

    return epfd;
}

int event_queue_add_fd_read(int eq, int fd) {

    uwsgi_log("== event_queue_add_fd_read ==\n");

    struct epoll_event ee;

    memset(ⅇ, 0, sizeof(struct epoll_event));
    ee.events = EPOLLIN;
    ee.data.fd = fd;

    if (epoll_ctl(eq, EPOLL_CTL_ADD, fd, ⅇ)) {
        uwsgi_error("epoll_ctl()");
        return -1;
    }    

    return 0;
}

core/util.c

// !!!accept请求!!!
// accept a request
int wsgi_req_accept(int queue, struct wsgi_request *wsgi_req) {
    ...

    // !!!序列化accept,防止惊群效应!!!
    thunder_lock;
    ...

    // !!!调用epoll_wait收集发生事件的一个文件描述符,并保存到interesting_fd!!!
    ret = event_queue_wait(queue, timeout, &interesting_fd);
    if (ret < 0) {
        thunder_unlock;
        return -1;
    }
    ...

    // <strong>循环检测是哪个server socket发生了事件,然后accept请求,并设置wsgi_req对象</strong>
    while (uwsgi_sock) {
        if (interesting_fd == uwsgi_sock->fd || (uwsgi_sock->retry && uwsgi_sock->retry[wsgi_req->async_id]) || (uwsgi_sock->fd_threads && interesting_fd == uwsgi_sock->fd_threads[wsgi_req->async_id])) {
            <strong>wsgi_req->socket = uwsgi_sock;
            wsgi_req->fd = wsgi_req->socket->proto_accept(wsgi_req, interesting_fd);</strong>
            thunder_unlock;
            if (wsgi_req->fd < 0) {
                if (uwsgi.threads > 1)
                    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ret);
                return -1;
            }

            if (!uwsgi_sock->edge_trigger) {
                uwsgi_post_accept(wsgi_req);
            }

            // !!! 返回0 表示成功accept了请求!!!
            <strong>return 0;</strong>
        }

        uwsgi_sock = uwsgi_sock->next;
    }
    ...
}

core/utils.c:

// !!!接收并处理请求!!!
// receive a new request
int wsgi_req_recv(int queue, struct wsgi_request *wsgi_req) {
    ...

    // <strong>读取请求</strong>
    // edge triggered sockets get the whole request during accept() phase
    if (!wsgi_req->socket->edge_trigger) {
        for (;;) {
            int ret = wsgi_req->socket->proto(wsgi_req);
            if (ret == UWSGI_OK)
                break;
            if (ret == UWSGI_AGAIN) {
                ret = uwsgi_wait_read_req(wsgi_req);
                if (ret <= 0)
                    return -1;
                continue;
            }
            return -1;
        }
    }
    ...

    // <strong>根据header头中的modifier1选择调用哪一个请求插件的request钩子</strong>
    wsgi_req->async_status = uwsgi.p[wsgi_req->uh->modifier1]->request(wsgi_req);

    return 0;
}

因为我们使用的是python请求插件,所以看一下python的request钩子 uwsgi_request_wsgi 函数:

plugins/python/python_plugin.c:

int uwsgi_request_wsgi(struct wsgi_request *wsgi_req) {
    ...
    struct uwsgi_app *wi;

    // !!! 获取到uwsgi_app实例 !!!
    wi = &uwsgi_apps[wsgi_req->app_id];
    ...

    // !!!<strong>获取GIL</strong>!!!
    UWSGI_GET_GIL

    // no fear of race conditions for this counter as it is already protected by the GIL
    wi->requests++;

    // !!! 创建wsgi环境变量 !!!
    // create WSGI environ
    wsgi_req->async_environ = up.wsgi_env_create(wsgi_req, wi);

    // !!!构建调用WSGI应用程序所需要的参数,并且调用WSGI应用程序!!!
    wsgi_req->async_result = wi->request_subhandler(wsgi_req, wi);

    // !!!迭代WSGI应用程序的返回,并发送响应!!!
    while (wi->response_subhandler(wsgi_req) != UWSGI_OK) {
            if (uwsgi.async > 1) {
                UWSGI_RELEASE_GIL
                wsgi_req->async_force_again = 1;
                return UWSGI_AGAIN;
            }   
            else {
                wsgi_req->switches++;
            }   
        }  
    }
    ...

    // !!!<strong>释放GIL</strong>!!!
    UWSGI_RELEASE_GIL
    ....

}

完毕!

值得关注的一些东西[]

1,在C/C++中嵌入Python时,C/C++代码中开启的线程 与 Python代码中开启的线程的关系[]

C/C++通过pthread库创建的线程是操作系统的原生线程,Python中创建的线程也是操作系统的原生线程,都是由操作系统来调度的。

Python中的线程每次被操作系统调度到的时候,需要先获取解释器实例的GIL。严格意义上说,Python中的多线程属于协同式多任务处理,在Python2中,线程不间断的执行100条字节码指令时,就会主动释放GIL,线程在等待I/O的时候,也会释放GIL。这是Python线程,在原生线程基础上的自身的特色。

当在C/C++中使用Python的C API的时候,只要遵循Python关于多线程的相应约定(GET GIL -> 将PyThreadState对象载入解释器 -> 执行Python代码 -> 清除PyThreadState对象 -> 释放GIL),那么C/C++创建的线程 和 Python本身创建的线程 本质都是一样的。

uwsgi使用的是插件化的开发方式。启动时,加载相关的若干个插件,并对这些插件进行初始化;运行时,根据输入,调用相应的插件的钩子函数,来完成处理。uwsgi只需要完成 主流程 和通用功能,而将具体实现交给插件去做。这种可插拔的方式,是非常值得借鉴的。

uwsgi采用的是经典的 单master/多worker 并发模型。master负责信号处理,初始化自身,创建并管理worker进程,以及绑定监听socket。worker在自己的循环中,负责accept -> recv -> handle request -> send response。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

一胜九败

一胜九败

柳井正 / 徐静波 / 中信出版社 / 2011-1-19 / 28.00元

优衣库成长的过程,就是一个历经了无数次失败的过程。他们经历过无法从银行融资的焦灼,经历过“衣服因低价热销,但人们买回去之后立即把商标剪掉”的难堪,经历过为上市冲刺而拼命扩张店铺的疯狂,也经历过被消费者冷落、疏离的苦痛……但正是从这些失败中学到的经验与教训,让柳井正走向了成功。 《一胜九败:优衣库风靡全球的秘密》就像是柳井正的错误集,在这里,他毫不隐晦地将公司业绩低迷的原因、进军海外失败的因素......一起来看看 《一胜九败》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具