目录

  1. 摘要
  2. 源码分析
  3. 值得关注的一些东西
  4. 参考文档

摘要[TOC]


源码分析[TOC]

1,程序入口[TOC]
core/uwsgi.c:

#ifdef UWSGI_AS_SHARED_LIBRARY
int uwsgi_init(int argc, char *argv[], char *envp[]) {
#else
int main(int argc, char *argv[], char *envp[]) {
#endif
    uwsgi_setup(argc, argv, envp);
    return uwsgi_run();
}

2,uwsgi_setup()函数的主要代码[TOC]
core/uwsgi.c:

void uwsgi_setup(int argc, char *argv[], char *envp[]) {
    ....
    // !!! 初始化内嵌插件 !!!
    //initialize embedded plugins
    UWSGI_LOAD_EMBEDDED_PLUGINS
    ....
    uwsgi_start((void *) uwsgi.argv);
}

在执行sudo python setup.py install安装uwsgi的时候,可以看到:

configured CFLAGS: ... -DUWSGI_LOAD_EMBEDDED_PLUGINS="ULEP(python);ULEP(gevent);ULEP(ping);ULEP(cache);ULEP(nagios);ULEP(rrdtool);ULEP(carbon);ULEP(rpc);ULEP(corerouter);ULEP(fastrouter);ULEP(http);ULEP(ugreen);ULEP(signal);ULEP(syslog);ULEP(rsyslog);ULEP(logsocket);ULEP(router_uwsgi);ULEP(router_redirect);ULEP(router_basicauth);ULEP(zergpool);ULEP(redislog);ULEP(mongodblog);ULEP(router_rewrite);ULEP(router_http);ULEP(logfile);ULEP(router_cache);ULEP(rawrouter);ULEP(router_static);ULEP(sslrouter);ULEP(spooler);ULEP(cheaper_busyness);ULEP(symcall);ULEP(transformation_tofile);ULEP(transformation_gzip);ULEP(transformation_chunked);ULEP(transformation_offload);ULEP(router_memcached);ULEP(router_redis);ULEP(router_hash);ULEP(router_expires);ULEP(router_metrics);ULEP(transformation_template);ULEP(stats_pusher_socket);"

接下来,在uwsgi.h头文件中,找到ULEP这个宏:

#define ULEP(pname)\
    if (pname##_plugin.request) {\
    uwsgi.p[pname##_plugin.modifier1] = &pname##_plugin;\
    if (uwsgi.p[pname##_plugin.modifier1]->on_load) {\
        uwsgi.p[pname##_plugin.modifier1]->on_load();}\
    }\
    else {\
    if (uwsgi.gp_cnt >= MAX_GENERIC_PLUGINS) {\
        uwsgi_log("you have embedded too much generic plugins !!!\n");\
        exit(1);\
    }\
    uwsgi.gp[uwsgi.gp_cnt] = &pname##_plugin;\
    if (uwsgi.gp[uwsgi.gp_cnt]->on_load)\
        uwsgi.gp[uwsgi.gp_cnt]->on_load();\
    uwsgi.gp_cnt++;\
    }\

可以看到,所有具有request属性的插件,会被当作请求插件,保存到uwsgi.p[]数组中(注意:每个插件对象在数组中的索引值是其modifier1属性的值);
其它的插件会被当作通用插件,保存到uwsgi.gp[]数组中。
同时,会调用所有插件的on_load钩子。
默认情况下,python是内嵌的请求插件。
uwsgi_setup中最重要的工作就是把插件都装配好了

3,uwsgi_start()函数的主要代码[TOC]
uwsgi_setup()函数最后调用了uwsgi_start()uwsgi_start()函数用于完成各种初始化工作
core/uwsgi.c:

int uwsgi_start(void *v_argv) {
    ...

    // !!!初始化socket协议!!!
    // initialize socket protocols (do it after caching !!!)
    uwsgi_protocols_register();
    ...

    // !!! 绑定所有未绑定的socket !!!
    uwsgi_bind_sockets();
    
    // put listening socket in non-blocking state and set the protocol
    uwsgi_set_sockets_protocols();
    ...
    
    // initialize request plugin only if workers or master are available
    if (uwsgi.sockets || uwsgi.master_process || uwsgi.no_server || uwsgi.command_mode || uwsgi.loop) {
        for (i = 0; i < 256; i++) {
            if (uwsgi.p[i]->init) {
                uwsgi.p[i]->init();
            }    
        }    
    }
    ...

    if (uwsgi.has_threads) {
        ...
        // again check for workers/sockets...
        if (uwsgi.sockets || uwsgi.master_process || uwsgi.no_server || uwsgi.command_mode || uwsgi.loop) {
            for (i = 0; i < 256; i++) {
                if (uwsgi.p[i]->enable_threads)
                    uwsgi.p[i]->enable_threads();
            }
        }
    }
    ...

    //init apps hook (if not lazy)
    if (!uwsgi.lazy && !uwsgi.lazy_apps) {
        uwsgi_init_all_apps();
    }
    ...


    if (!uwsgi.status.is_cheap) {
        if (uwsgi.cheaper && uwsgi.cheaper_count) {
            int nproc = uwsgi.cheaper_initial;
            if (!nproc)
                nproc = uwsgi.cheaper_count;
            for (i = 1; i <= uwsgi.numproc; i++) {
                if (i <= nproc) {
                    if (uwsgi_respawn_worker(i))
                        break;
                    uwsgi.respawn_delta = uwsgi_now();
                }    
                else {
                    uwsgi.workers[i].cheaped = 1; 
                }    
            }    
        } 
    ...
}

uwsgi_start()函数,主要做以下初始化工作:

struct uwsgi_plugin python_plugin = {
    .name = "python",
    .alias = "python",
    .modifier1 = 0,
    .init = uwsgi_python_init,
    .enable_threads = uwsgi_python_enable_threads,
    .init_apps = uwsgi_python_init_apps,
    ...
}

uwsgi_python_init()函数中,最主要的动作就是调用了Python的C API Py_Initialize()创建了Python的虚拟机实例,同时通过PyThreadState_Get()获取了主线程的ThreadState对象,并将其保存到全局变量uwsgi_python upmain_thread域中。
至此,Python虚拟机实例就被创建了

void uwsgi_python_enable_threads() {
    ...

    // !!!开启多线程支持,这条语句也会导致主进程的主线程获取到GIL,并且在该函数中,没有释放。Pyhton要求C代码在执行任何Python代码之前获取到GIL!!!
    PyEval_InitThreads();
    ...

    // !!!将ThreadState对象保存到Thread Local中,
    // 在Python中每个线程对应一个ThreadState对象!!!
    pthread_setspecific(up.upt_gil_key, (void *) PyThreadState_Get());
    ...


    // !!!将 获取和释放GIL的方法 保存到全局变量up的域中!!!
    up.gil_get = gil_real_get;
    up.gil_release = gil_real_release;
    ...
}

上面的流程中主要使用了Python的C API,更多细节可以移步参考文档

void uwsgi_python_init_apps() {
    // lazy ?
    if (uwsgi.mywid > 0) {
        UWSGI_GET_GIL; // 如果是延迟加载(也就是在子进程中加载),则获取GIL
    }
    ...

    // setup app loaders
    ...

    if (up.file_config != NULL) {
        init_uwsgi_app(LOADER_FILE, up.file_config, uwsgi.wsgi_req, up.main_thread, PYTHON_APP_TYPE_WSGI);
    }
    ...

    // lazy ?
    if (uwsgi.mywid > 0) {
        UWSGI_RELEASE_GIL;
    } 
}

下面看init_uwsgi_app()函数:
plugins/python/pyloader.c:

int init_uwsgi_app(int loader, void *arg1, struct wsgi_request *wsgi_req, PyThreadState *interpreter, int app_type) {
    ...

    // !!! 创建uwsgi_app对象 !!!
    struct uwsgi_app *wi;
    ...

    // !!! 将该uwsgi_app对象保存到数组uwsgi.workers[uwsgi.mywid].apps !!!
    wi = &uwsgi_apps[id];
    ...

    // !!! 设置modifier1 !!!
    wi->modifier1 = python_plugin.modifier1;
    ...

    // !!! 加载WSGI Application !!!
    wi->callable = up.loaders[loader](arg1);
    ...

    // !!! 设置request 和 response handler !!!
    if (app_type == PYTHON_APP_TYPE_WSGI) {
        wi->request_subhandler = uwsgi_request_subhandler_wsgi;
        wi->response_subhandler = uwsgi_response_subhandler_wsgi;
        wi->argc = 2;
    }
    ...
}

至此,WSGI application已经被导入了。

至此,整个的初始化过程就完成了。

4,uwsgi_run()函数的主要代码[TOC]
uwsgi_start()完成之后,就会进入到uwsgi_run()函数:
core/uwsgi.c:

int uwsgi_run() {

    // !!! master进程进入master循环,master负责监控worker状态,在worker crash的时候,负责重新fork工作!!!
    // !!! from now on, we could be in the master or in a worker !!!
    int i;

    if (getpid() == masterpid && uwsgi.master_process == 1) { 
#ifdef UWSGI_AS_SHARED_LIBRARY
        int ml_ret = master_loop(uwsgi.argv, uwsgi.environ);
        if (ml_ret == -1) {
            return 0;
        }    
#else
        (void) master_loop(uwsgi.argv, uwsgi.environ);
#endif
        //from now on the process is a real worker
    }    
    ...

    // !!!worker进入worker循环!!!
    uwsgi_worker_run();
    _exit(0);
}

我们不看master循环了,只看worker循环。不理解,为何这一个函数,能进入到多个无限循环的童鞋,可以移步参考文档,了解一下fork系统调用。
接下来进入到uwsgi_worker_run()函数:

void uwsgi_worker_run() {
    ...

    uwsgi_ignition();

    // never here
    exit(0);

}

然后进入到uwsgi_ignition()函数:

void uwsgi_ignition() {
    ...

    else {
        if (uwsgi.async < 2) {
            simple_loop();
        }
        else {
            async_loop();
        }
    }

    // end of the process...
    end_me(0);
}

我们只看simple_loop()函数的源代码。

5,simple_loop()函数的主要代码[TOC]
core/loop.c:

void simple_loop() {
    uwsgi_loop_cores_run(simple_loop_run);
}

void uwsgi_loop_cores_run(void *(*func) (void *)) {
    int i;
    for (i = 1; i < uwsgi.threads; i++) {
        long j = i;
        pthread_create(&uwsgi.workers[uwsgi.mywid].cores[i].thread_id, &uwsgi.threads_attr, func, (void *) j); 
    }
    long y = 0;
    func((void *) y); 
}

这段代码表明,uwsgi开启了uwsgi.threads(包括主线程)个操作系统线程,并发的运行simple_loop_run()函数:

void *simple_loop_run(void *arg1) {
    ...

    // !!!在uwsgi_setup_thread_req()函数中调用了所有请求插件的init_thread钩子。!!!
    // !!!python插件的init_thread钩子 uwsgi_python_init_thread()函数,会为线程创建了一个ThreadState对象,并保存到Thread Local中,后续的GET_GIL 和 RELEASE_GIL都会用到这些ThreadState状态,这是C/C++多线程调用Python的约定!!!
    if (uwsgi.threads > 1) {
        uwsgi_setup_thread_req(core_id, wsgi_req);
    }
    ...

    // 下面的代码就是和操作系统平台相关的了,因为不同的平台实现了不同的IO多路复用机制,笔者主要就epoll进行说明
    // !!!创建一个用于轮询的epoll描述符!!!
    // initialize the main event queue to monitor sockets
    int main_queue = event_queue_init();

    // !!!向epoll描述符中,添加事件!!!
    uwsgi_add_sockets_to_queue(main_queue, core_id);
    ...

    // ok we are ready, let's start managing requests and signals
    while (uwsgi.workers[uwsgi.mywid].manage_next_request) {

        // !!!初始化wsgi_req对象!!!
        wsgi_req_setup(wsgi_req, core_id, NULL);

        // !!!该函数的详细说明在下面,点此跳转!!!
        if (wsgi_req_accept(main_queue, wsgi_req)) {
            continue;
        }   

        // !!!该函数的详细说明在下面,点此跳转!!!
        if (wsgi_req_recv(main_queue, wsgi_req)) {
            uwsgi_destroy_request(wsgi_req);
            continue;
        }   

        // !!!释放wsgi_req中的资源!!!
        uwsgi_close_request(wsgi_req);
    } 
    ...

}

下面是被simple_loop_run()函数所调用的一些(和epoll相关的)代码的细节:
core/event.c:

int event_queue_init() {
    int epfd;


    epfd = epoll_create(256);

    if (epfd < 0) { 
        uwsgi_error("epoll_create()");
        return -1;
    }    

    return epfd;
}

int event_queue_add_fd_read(int eq, int fd) {

    uwsgi_log("== event_queue_add_fd_read ==\n");

    struct epoll_event ee;

    memset(&ee, 0, sizeof(struct epoll_event));
    ee.events = EPOLLIN;
    ee.data.fd = fd;

    if (epoll_ctl(eq, EPOLL_CTL_ADD, fd, &ee)) {
        uwsgi_error("epoll_ctl()");
        return -1;
    }    

    return 0;
}

core/util.c

// !!!accept请求!!!
// accept a request
int wsgi_req_accept(int queue, struct wsgi_request *wsgi_req) {
    ...

    // !!!序列化accept,防止惊群效应!!!
    thunder_lock;
    ...

    // !!!调用epoll_wait收集发生事件的一个文件描述符,并保存到interesting_fd!!!
    ret = event_queue_wait(queue, timeout, &interesting_fd);
    if (ret < 0) {
        thunder_unlock;
        return -1;
    }
    ...

    // 循环检测是哪个server socket发生了事件,然后accept请求,并设置wsgi_req对象
    while (uwsgi_sock) {
        if (interesting_fd == uwsgi_sock->fd || (uwsgi_sock->retry && uwsgi_sock->retry[wsgi_req->async_id]) || (uwsgi_sock->fd_threads && interesting_fd == uwsgi_sock->fd_threads[wsgi_req->async_id])) {
            wsgi_req->socket = uwsgi_sock;
            wsgi_req->fd = wsgi_req->socket->proto_accept(wsgi_req, interesting_fd);
            thunder_unlock;
            if (wsgi_req->fd < 0) {
                if (uwsgi.threads > 1)
                    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ret);
                return -1;
            }

            if (!uwsgi_sock->edge_trigger) {
                uwsgi_post_accept(wsgi_req);
            }

            // !!! 返回0 表示成功accept了请求!!!
            return 0;
        }

        uwsgi_sock = uwsgi_sock->next;
    }
    ...
}

core/utils.c:

// !!!接收并处理请求!!!
// receive a new request
int wsgi_req_recv(int queue, struct wsgi_request *wsgi_req) {
    ...

    // 读取请求
    // edge triggered sockets get the whole request during accept() phase
    if (!wsgi_req->socket->edge_trigger) {
        for (;;) {
            int ret = wsgi_req->socket->proto(wsgi_req);
            if (ret == UWSGI_OK)
                break;
            if (ret == UWSGI_AGAIN) {
                ret = uwsgi_wait_read_req(wsgi_req);
                if (ret <= 0)
                    return -1;
                continue;
            }
            return -1;
        }
    }
    ...

    // 根据header头中的modifier1选择调用哪一个请求插件的request钩子
    wsgi_req->async_status = uwsgi.p[wsgi_req->uh->modifier1]->request(wsgi_req);

    return 0;
}

因为我们使用的是python请求插件,所以看一下python的request钩子uwsgi_request_wsgi函数:
plugins/python/python_plugin.c:

int uwsgi_request_wsgi(struct wsgi_request *wsgi_req) {
    ...
    struct uwsgi_app *wi;

    // !!! 获取到uwsgi_app实例 !!!
    wi = &uwsgi_apps[wsgi_req->app_id];
    ...

    // !!!获取GIL!!!
    UWSGI_GET_GIL

    // no fear of race conditions for this counter as it is already protected by the GIL
    wi->requests++;

    // !!! 创建wsgi环境变量 !!!
    // create WSGI environ
    wsgi_req->async_environ = up.wsgi_env_create(wsgi_req, wi);

    // !!!构建调用WSGI应用程序所需要的参数,并且调用WSGI应用程序!!!
    wsgi_req->async_result = wi->request_subhandler(wsgi_req, wi);

    // !!!迭代WSGI应用程序的返回,并发送响应!!!
    while (wi->response_subhandler(wsgi_req) != UWSGI_OK) {
            if (uwsgi.async > 1) {
                UWSGI_RELEASE_GIL
                wsgi_req->async_force_again = 1;
                return UWSGI_AGAIN;
            }   
            else {
                wsgi_req->switches++;
            }   
        }  
    }
    ...

    // !!!释放GIL!!!
    UWSGI_RELEASE_GIL
    ....

}

综上,worker循环的大致流程就是:不断的accept连接 -> 读取请求 -> 处理请求 -> 发送响应。

完毕!

值得关注的一些东西[TOC]

1,在C/C++中嵌入Python时,C/C++代码中开启的线程 与 Python代码中开启的线程的关系[TOC]
C/C++通过pthread库创建的线程是操作系统的原生线程,Python中创建的线程也是操作系统的原生线程,都是由操作系统来调度的。
Python中的线程每次被操作系统调度到的时候,需要先获取解释器实例的GIL。严格意义上说,Python中的多线程属于协同式多任务处理,在Python2中,线程不间断的执行100条字节码指令时,就会主动释放GIL,线程在等待I/O的时候,也会释放GIL。这是Python线程,在原生线程基础上的自身的特色。
当在C/C++中使用Python的C API的时候,只要遵循Python关于多线程的相应约定(GET GIL -> 将PyThreadState对象载入解释器 -> 执行Python代码 -> 清除PyThreadState对象 -> 释放GIL),那么C/C++创建的线程 和 Python本身创建的线程 本质都是一样的。

2,插件化开发[TOC]
uwsgi使用的是插件化的开发方式。启动时,加载相关的若干个插件(比如python请求插件),并对这些插件进行初始化;运行时,根据输入,调用相应的插件的钩子函数,来完成处理。uwsgi只需要完成主流程和通用功能,而将具体实现交给插件去做。这种可插拔的方式,是非常值得借鉴的。

3,并发模型[TOC]
uwsgi采用的是经典的 单master/多worker 并发模型。master负责信号处理,初始化,创建并管理worker进程,以及绑定监听sockets。worker的每个线程,处在这样的一个循环中:accept -> recv -> handle request -> send response。


参考文档[TOC]