1,程序入口[TOC]
core/uwsgi.c:
#ifdef UWSGI_AS_SHARED_LIBRARY
int uwsgi_init(int argc, char *argv[], char *envp[]) {
#else
int main(int argc, char *argv[], char *envp[]) {
#endif
uwsgi_setup(argc, argv, envp);
return uwsgi_run();
}
2,uwsgi_setup()函数的主要代码[TOC]
core/uwsgi.c:
void uwsgi_setup(int argc, char *argv[], char *envp[]) {
....
// !!! 初始化内嵌插件 !!!
//initialize embedded plugins
UWSGI_LOAD_EMBEDDED_PLUGINS
....
uwsgi_start((void *) uwsgi.argv);
}
在执行sudo python setup.py install安装uwsgi的时候,可以看到:
configured CFLAGS: ... -DUWSGI_LOAD_EMBEDDED_PLUGINS="ULEP(python);ULEP(gevent);ULEP(ping);ULEP(cache);ULEP(nagios);ULEP(rrdtool);ULEP(carbon);ULEP(rpc);ULEP(corerouter);ULEP(fastrouter);ULEP(http);ULEP(ugreen);ULEP(signal);ULEP(syslog);ULEP(rsyslog);ULEP(logsocket);ULEP(router_uwsgi);ULEP(router_redirect);ULEP(router_basicauth);ULEP(zergpool);ULEP(redislog);ULEP(mongodblog);ULEP(router_rewrite);ULEP(router_http);ULEP(logfile);ULEP(router_cache);ULEP(rawrouter);ULEP(router_static);ULEP(sslrouter);ULEP(spooler);ULEP(cheaper_busyness);ULEP(symcall);ULEP(transformation_tofile);ULEP(transformation_gzip);ULEP(transformation_chunked);ULEP(transformation_offload);ULEP(router_memcached);ULEP(router_redis);ULEP(router_hash);ULEP(router_expires);ULEP(router_metrics);ULEP(transformation_template);ULEP(stats_pusher_socket);"
接下来,在uwsgi.h头文件中,找到ULEP这个宏:
#define ULEP(pname)\
if (pname##_plugin.request) {\
uwsgi.p[pname##_plugin.modifier1] = &pname##_plugin;\
if (uwsgi.p[pname##_plugin.modifier1]->on_load) {\
uwsgi.p[pname##_plugin.modifier1]->on_load();}\
}\
else {\
if (uwsgi.gp_cnt >= MAX_GENERIC_PLUGINS) {\
uwsgi_log("you have embedded too much generic plugins !!!\n");\
exit(1);\
}\
uwsgi.gp[uwsgi.gp_cnt] = &pname##_plugin;\
if (uwsgi.gp[uwsgi.gp_cnt]->on_load)\
uwsgi.gp[uwsgi.gp_cnt]->on_load();\
uwsgi.gp_cnt++;\
}\
可以看到,所有具有request属性的插件,会被当作请求插件,保存到uwsgi.p[]数组中(注意:每个插件对象在数组中的索引值是其modifier1属性的值);
其它的插件会被当作通用插件,保存到uwsgi.gp[]数组中。
同时,会调用所有插件的on_load钩子。
默认情况下,python是内嵌的请求插件。
uwsgi_setup中最重要的工作就是把插件都装配好了。
3,uwsgi_start()函数的主要代码[TOC]
uwsgi_setup()函数最后调用了uwsgi_start(),uwsgi_start()函数用于完成各种初始化工作:
core/uwsgi.c:
int uwsgi_start(void *v_argv) {
...
// !!!初始化socket协议!!!
// initialize socket protocols (do it after caching !!!)
uwsgi_protocols_register();
...
// !!! 绑定所有未绑定的socket !!!
uwsgi_bind_sockets();
// put listening socket in non-blocking state and set the protocol
uwsgi_set_sockets_protocols();
...
// initialize request plugin only if workers or master are available
if (uwsgi.sockets || uwsgi.master_process || uwsgi.no_server || uwsgi.command_mode || uwsgi.loop) {
for (i = 0; i < 256; i++) {
if (uwsgi.p[i]->init) {
uwsgi.p[i]->init();
}
}
}
...
if (uwsgi.has_threads) {
...
// again check for workers/sockets...
if (uwsgi.sockets || uwsgi.master_process || uwsgi.no_server || uwsgi.command_mode || uwsgi.loop) {
for (i = 0; i < 256; i++) {
if (uwsgi.p[i]->enable_threads)
uwsgi.p[i]->enable_threads();
}
}
}
...
//init apps hook (if not lazy)
if (!uwsgi.lazy && !uwsgi.lazy_apps) {
uwsgi_init_all_apps();
}
...
if (!uwsgi.status.is_cheap) {
if (uwsgi.cheaper && uwsgi.cheaper_count) {
int nproc = uwsgi.cheaper_initial;
if (!nproc)
nproc = uwsgi.cheaper_count;
for (i = 1; i <= uwsgi.numproc; i++) {
if (i <= nproc) {
if (uwsgi_respawn_worker(i))
break;
uwsgi.respawn_delta = uwsgi_now();
}
else {
uwsgi.workers[i].cheaped = 1;
}
}
}
...
}
uwsgi_start()函数,主要做以下初始化工作:
调用uwsgi_protocols_register(),初始化socket协议。最终会把uwsgi支持的所有协议,保存到uwsgi.protocols链表中。在绑定完所有的socket之后,会将相应的回调设置给每个socket
在uwsgi_bind_sockets()函数中,对每个socket,调用了bind_to_tcp(uwsgi_sock->name, uwsgi.listen_queue, tcp_port)(core/socket.c),bind_to_tcp()函数在按需执行各种setsockopt之后,执行bind,listen
至此,master进程监听了所有的socket。
调用所有请求插件的init钩子。默认情况下,python请求插件已经被装配了,接下来看,python插件的init方法都做了什么:
plugins/python/python_plugin.c:
struct uwsgi_plugin python_plugin = {
.name = "python",
.alias = "python",
.modifier1 = 0,
.init = uwsgi_python_init,
.enable_threads = uwsgi_python_enable_threads,
.init_apps = uwsgi_python_init_apps,
...
}
uwsgi_python_init()函数中,最主要的动作就是调用了Python的C API Py_Initialize()创建了Python的虚拟机实例,同时通过PyThreadState_Get()获取了主线程的ThreadState对象,并将其保存到全局变量uwsgi_python up的main_thread域中。
至此,Python虚拟机实例就被创建了
uwsgi_python_enable_threads()的主要代码:
void uwsgi_python_enable_threads() {
...
// !!!开启多线程支持,这条语句也会导致主进程的主线程获取到GIL,并且在该函数中,没有释放。Pyhton要求C代码在执行任何Python代码之前获取到GIL!!!
PyEval_InitThreads();
...
// !!!将ThreadState对象保存到Thread Local中,
// 在Python中每个线程对应一个ThreadState对象!!!
pthread_setspecific(up.upt_gil_key, (void *) PyThreadState_Get());
...
// !!!将 获取和释放GIL的方法 保存到全局变量up的域中!!!
up.gil_get = gil_real_get;
up.gil_release = gil_real_release;
...
}
上面的流程中主要使用了Python的C API,更多细节可以移步参考文档。
--lazy-apps选项开启lazy模式,lazy模式是在worker进程中加载WSGI Application,而非lazy模式是在master进程中加载WSGI Application),uwsgi_start()函数还会调用请求插件的init_apps钩子。下面看python插件的init_apps钩子 uwsgi_python_init_apps 都做了什么:
void uwsgi_python_init_apps() {
// lazy ?
if (uwsgi.mywid > 0) {
UWSGI_GET_GIL; // 如果是延迟加载(也就是在子进程中加载),则获取GIL
}
...
// setup app loaders
...
if (up.file_config != NULL) {
init_uwsgi_app(LOADER_FILE, up.file_config, uwsgi.wsgi_req, up.main_thread, PYTHON_APP_TYPE_WSGI);
}
...
// lazy ?
if (uwsgi.mywid > 0) {
UWSGI_RELEASE_GIL;
}
}
下面看init_uwsgi_app()函数:
plugins/python/pyloader.c:
int init_uwsgi_app(int loader, void *arg1, struct wsgi_request *wsgi_req, PyThreadState *interpreter, int app_type) {
...
// !!! 创建uwsgi_app对象 !!!
struct uwsgi_app *wi;
...
// !!! 将该uwsgi_app对象保存到数组uwsgi.workers[uwsgi.mywid].apps !!!
wi = &uwsgi_apps[id];
...
// !!! 设置modifier1 !!!
wi->modifier1 = python_plugin.modifier1;
...
// !!! 加载WSGI Application !!!
wi->callable = up.loaders[loader](arg1);
...
// !!! 设置request 和 response handler !!!
if (app_type == PYTHON_APP_TYPE_WSGI) {
wi->request_subhandler = uwsgi_request_subhandler_wsgi;
wi->response_subhandler = uwsgi_response_subhandler_wsgi;
wi->argc = 2;
}
...
}
至此,WSGI application已经被导入了。
uwsgi_start()函数最后会fork出指定数量的worker进程,并对它们进行一些列的设置,其中包括为每一个线程(也被称为:core)设置一个wsgi_request对象
至此,整个的初始化过程就完成了。
4,uwsgi_run()函数的主要代码[TOC]
uwsgi_start()完成之后,就会进入到uwsgi_run()函数:
core/uwsgi.c:
int uwsgi_run() {
// !!! master进程进入master循环,master负责监控worker状态,在worker crash的时候,负责重新fork等工作!!!
// !!! from now on, we could be in the master or in a worker !!!
int i;
if (getpid() == masterpid && uwsgi.master_process == 1) {
#ifdef UWSGI_AS_SHARED_LIBRARY
int ml_ret = master_loop(uwsgi.argv, uwsgi.environ);
if (ml_ret == -1) {
return 0;
}
#else
(void) master_loop(uwsgi.argv, uwsgi.environ);
#endif
//from now on the process is a real worker
}
...
// !!!worker进入worker循环!!!
uwsgi_worker_run();
_exit(0);
}
我们不看master循环了,只看worker循环。不理解,为何这一个函数,能进入到多个无限循环的童鞋,可以移步参考文档,了解一下fork系统调用。
接下来进入到uwsgi_worker_run()函数:
void uwsgi_worker_run() {
...
uwsgi_ignition();
// never here
exit(0);
}
然后进入到uwsgi_ignition()函数:
void uwsgi_ignition() {
...
else {
if (uwsgi.async < 2) {
simple_loop();
}
else {
async_loop();
}
}
// end of the process...
end_me(0);
}
我们只看simple_loop()函数的源代码。
5,simple_loop()函数的主要代码[TOC]
core/loop.c:
void simple_loop() {
uwsgi_loop_cores_run(simple_loop_run);
}
void uwsgi_loop_cores_run(void *(*func) (void *)) {
int i;
for (i = 1; i < uwsgi.threads; i++) {
long j = i;
pthread_create(&uwsgi.workers[uwsgi.mywid].cores[i].thread_id, &uwsgi.threads_attr, func, (void *) j);
}
long y = 0;
func((void *) y);
}
这段代码表明,uwsgi开启了uwsgi.threads(包括主线程)个操作系统线程,并发的运行simple_loop_run()函数:
void *simple_loop_run(void *arg1) {
...
// !!!在uwsgi_setup_thread_req()函数中调用了所有请求插件的init_thread钩子。!!!
// !!!python插件的init_thread钩子 uwsgi_python_init_thread()函数,会为线程创建了一个ThreadState对象,并保存到Thread Local中,后续的GET_GIL 和 RELEASE_GIL都会用到这些ThreadState状态,这是C/C++多线程调用Python的约定!!!
if (uwsgi.threads > 1) {
uwsgi_setup_thread_req(core_id, wsgi_req);
}
...
// 下面的代码就是和操作系统平台相关的了,因为不同的平台实现了不同的IO多路复用机制,笔者主要就epoll进行说明
// !!!创建一个用于轮询的epoll描述符!!!
// initialize the main event queue to monitor sockets
int main_queue = event_queue_init();
// !!!向epoll描述符中,添加事件!!!
uwsgi_add_sockets_to_queue(main_queue, core_id);
...
// ok we are ready, let's start managing requests and signals
while (uwsgi.workers[uwsgi.mywid].manage_next_request) {
// !!!初始化wsgi_req对象!!!
wsgi_req_setup(wsgi_req, core_id, NULL);
// !!!该函数的详细说明在下面,点此跳转!!!
if (wsgi_req_accept(main_queue, wsgi_req)) {
continue;
}
// !!!该函数的详细说明在下面,点此跳转!!!
if (wsgi_req_recv(main_queue, wsgi_req)) {
uwsgi_destroy_request(wsgi_req);
continue;
}
// !!!释放wsgi_req中的资源!!!
uwsgi_close_request(wsgi_req);
}
...
}
下面是被simple_loop_run()函数所调用的一些(和epoll相关的)代码的细节:
core/event.c:
int event_queue_init() {
int epfd;
epfd = epoll_create(256);
if (epfd < 0) {
uwsgi_error("epoll_create()");
return -1;
}
return epfd;
}
int event_queue_add_fd_read(int eq, int fd) {
uwsgi_log("== event_queue_add_fd_read ==\n");
struct epoll_event ee;
memset(&ee, 0, sizeof(struct epoll_event));
ee.events = EPOLLIN;
ee.data.fd = fd;
if (epoll_ctl(eq, EPOLL_CTL_ADD, fd, &ee)) {
uwsgi_error("epoll_ctl()");
return -1;
}
return 0;
}
core/util.c:
// !!!accept请求!!!
// accept a request
int wsgi_req_accept(int queue, struct wsgi_request *wsgi_req) {
...
// !!!序列化accept,防止惊群效应!!!
thunder_lock;
...
// !!!调用epoll_wait收集发生事件的一个文件描述符,并保存到interesting_fd!!!
ret = event_queue_wait(queue, timeout, &interesting_fd);
if (ret < 0) {
thunder_unlock;
return -1;
}
...
// 循环检测是哪个server socket发生了事件,然后accept请求,并设置wsgi_req对象
while (uwsgi_sock) {
if (interesting_fd == uwsgi_sock->fd || (uwsgi_sock->retry && uwsgi_sock->retry[wsgi_req->async_id]) || (uwsgi_sock->fd_threads && interesting_fd == uwsgi_sock->fd_threads[wsgi_req->async_id])) {
wsgi_req->socket = uwsgi_sock;
wsgi_req->fd = wsgi_req->socket->proto_accept(wsgi_req, interesting_fd);
thunder_unlock;
if (wsgi_req->fd < 0) {
if (uwsgi.threads > 1)
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ret);
return -1;
}
if (!uwsgi_sock->edge_trigger) {
uwsgi_post_accept(wsgi_req);
}
// !!! 返回0 表示成功accept了请求!!!
return 0;
}
uwsgi_sock = uwsgi_sock->next;
}
...
}
core/utils.c:
// !!!接收并处理请求!!!
// receive a new request
int wsgi_req_recv(int queue, struct wsgi_request *wsgi_req) {
...
// 读取请求
// edge triggered sockets get the whole request during accept() phase
if (!wsgi_req->socket->edge_trigger) {
for (;;) {
int ret = wsgi_req->socket->proto(wsgi_req);
if (ret == UWSGI_OK)
break;
if (ret == UWSGI_AGAIN) {
ret = uwsgi_wait_read_req(wsgi_req);
if (ret <= 0)
return -1;
continue;
}
return -1;
}
}
...
// 根据header头中的modifier1选择调用哪一个请求插件的request钩子
wsgi_req->async_status = uwsgi.p[wsgi_req->uh->modifier1]->request(wsgi_req);
return 0;
}
因为我们使用的是python请求插件,所以看一下python的request钩子uwsgi_request_wsgi函数:
plugins/python/python_plugin.c:
int uwsgi_request_wsgi(struct wsgi_request *wsgi_req) {
...
struct uwsgi_app *wi;
// !!! 获取到uwsgi_app实例 !!!
wi = &uwsgi_apps[wsgi_req->app_id];
...
// !!!获取GIL!!!
UWSGI_GET_GIL
// no fear of race conditions for this counter as it is already protected by the GIL
wi->requests++;
// !!! 创建wsgi环境变量 !!!
// create WSGI environ
wsgi_req->async_environ = up.wsgi_env_create(wsgi_req, wi);
// !!!构建调用WSGI应用程序所需要的参数,并且调用WSGI应用程序!!!
wsgi_req->async_result = wi->request_subhandler(wsgi_req, wi);
// !!!迭代WSGI应用程序的返回,并发送响应!!!
while (wi->response_subhandler(wsgi_req) != UWSGI_OK) {
if (uwsgi.async > 1) {
UWSGI_RELEASE_GIL
wsgi_req->async_force_again = 1;
return UWSGI_AGAIN;
}
else {
wsgi_req->switches++;
}
}
}
...
// !!!释放GIL!!!
UWSGI_RELEASE_GIL
....
}
综上,worker循环的大致流程就是:不断的accept连接 -> 读取请求 -> 处理请求 -> 发送响应。
1,在C/C++中嵌入Python时,C/C++代码中开启的线程 与 Python代码中开启的线程的关系[TOC]
C/C++通过pthread库创建的线程是操作系统的原生线程,Python中创建的线程也是操作系统的原生线程,都是由操作系统来调度的。
Python中的线程每次被操作系统调度到的时候,需要先获取解释器实例的GIL。严格意义上说,Python中的多线程属于协同式多任务处理,在Python2中,线程不间断的执行100条字节码指令时,就会主动释放GIL,线程在等待I/O的时候,也会释放GIL。这是Python线程,在原生线程基础上的自身的特色。
当在C/C++中使用Python的C API的时候,只要遵循Python关于多线程的相应约定(GET GIL -> 将PyThreadState对象载入解释器 -> 执行Python代码 -> 清除PyThreadState对象 -> 释放GIL),那么C/C++创建的线程 和 Python本身创建的线程 本质都是一样的。
2,插件化开发[TOC]
uwsgi使用的是插件化的开发方式。启动时,加载相关的若干个插件(比如python请求插件),并对这些插件进行初始化;运行时,根据输入,调用相应的插件的钩子函数,来完成处理。uwsgi只需要完成主流程和通用功能,而将具体实现交给插件去做。这种可插拔的方式,是非常值得借鉴的。
3,并发模型[TOC]
uwsgi采用的是经典的 单master/多worker 并发模型。master负责信号处理,初始化,创建并管理worker进程,以及绑定监听sockets。worker的每个线程,处在这样的一个循环中:accept -> recv -> handle request -> send response。