Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实
Nodejs编程是全异步的,这就意味着我们不必每次都阻塞等待该次操作的结果,而事件完成(就绪)时会主动回调通知我们。在网络编程中,一般都是基于Reactor线程模型的变种,无论其怎么演化,其核心组件都包含了Reactor实例(提供事件注册、注销、通知功能)、多路复用器(由操作系统提供,比如kqueue、select、epoll等)、事件处理器(负责事件的处理)以及事件源(linux中这就是描述符)这四个组件。一般,会单独启动一个线程运行Reactor实例来实现真正的异步操作。但是,依赖操作系统提供的系统调用来实现异步是有局限的,比如在Reactor模型中我们只能监听到:网络IO事件、signel(信号)、超时事件以及一些管道事件等,但这些事件也只是通知我们资源可读或者可写,真正的读写操作(read和write)还是同步的(也就是你必须等到read或者write返回,虽然linux提供了aio,但是其有诸多槽点),那么Nodejs的全异步是如何做到的呢?你可能会很快想到,就是启用单独的线程来做同步的事情,这也是libuv的设计思路,借用官网的一张图,说明一切: 由上图可以看到,libuv实现了一套自己的线程池来处理所有同步操作(从而模拟出异步的效果),下面就来看一下该线程池的具体实现吧! 一、线程池模型 说道线程池,在java领域中,jdk本身就提供了多种线程池实现,几乎所有的线程池都遵循以下模型(任务队列+线程池): libuv自身定义了一个非常精炼、高效的队列(双向循环链表),只用了几个简单的宏定义将其实现,具体实现方式可以参见我的另一篇博文:libuv高效队列的实现。现在队列有了,来看一下task的定义: 1 struct uv__work { 2 void (*work)(struct uv__work *w); 3 void (*done)(struct uv__work *w, int status); 4 struct uv_loop_s* loop; 5 void* wq[2]; 6 }; 只听到从山间传来架构君的声音: 相识若知咱就里,和相识也一般憔悴。有谁来对上联或下联? uv__work就代表一个task,可以看到里面有两个函数指针(work代表任务实际操作,done用于对任务进行状态确认)。wq成员就是一个QUEUE的节点, uv__work就是通过wq与其他 uv__work连接成一个队列。 下面来看一下threadpool的初始化,代码如下: 此代码由Java架构师必看网-架构君整理 上面的代码中,一共创建了nthreads个线程,那么每个线程的执行代码是什么呢?由线程创建代码:uv_thread_create(threads + i, worker, NULL),可以看到,每一个线程都是执行worker函数,下面看看worker函数都在做什么: 1 /* To avoid deadlock with uv_cancel() it's crucial that the worker 2 * never holds the global mutex and the loop-local mutex at the same time. 3 */ 4 static void worker(void* arg) { 5 struct uv__work* w; 6 QUEUE* q; 7 8 (void) arg; 9 10 for (;;) { 11 // 因为是多线程访问,因此需要加锁同步 12 uv_mutex_lock(&mutex); 13 14 // 如果任务队列是空的 15 while (QUEUE_EMPTY(&wq)) { 16 // 空闲线程数加1 17 idle_threads += 1; 18 // 等待条件变量 19 uv_cond_wait(&cond, &mutex); 20 // 被唤醒之后,说明有任务被post到队列,因此空闲线程数需要减1 21 idle_threads -= 1; 22 } 23 24 // 取出队列的头部节点(第一个task) 25 q = QUEUE_HEAD(&wq); 26 27 if (q == &exit_message) 28 uv_cond_signal(&cond); 29 else { 30 // 从队列中移除这个task 31 QUEUE_REMOVE(q); 32 QUEUE_INIT(q); /* Signal uv_cancel() that the work req is 33 executing. */ 34 } 35 36 uv_mutex_unlock(&mutex); 37 38 if (q == &exit_message) 39 break; 40 41 // 取出uv__work首地址 42 w = QUEUE_DATA(q, struct uv__work, wq); 43 // 调用task的work,执行任务 44 w->work(w); 45 46 uv_mutex_lock(&w->loop->wq_mutex); 47 w->work = NULL; /* Signal uv_cancel() that the work req is done 48 executing. */ 49 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); 50 uv_async_send(&w->loop->wq_async); 51 uv_mutex_unlock(&w->loop->wq_mutex); 52 } 53 } 可以看到,多个线程都会在worker方法中等待在conn条件变量上,一旦有任务加入队列,线程就会被唤醒,然后只有一个线程会得到任务的执行权,其他的线程只能继续等待。 那么如何向队列提交一个task呢?看以下代码: 此代码由Java架构师必看网-架构君整理 接着看post做了什么: 1 static void post(QUEUE* q) { 2 // 同步队列操作 3 uv_mutex_lock(&mutex); 4 // 将task插入队列尾部 5 QUEUE_INSERT_TAIL(&wq, q); 6 // 如果当前有空闲线程,就向条件变量发送信号 7 if (idle_threads > 0) 8 uv_cond_signal(&cond); 9 uv_mutex_unlock(&mutex); 10 } 有提交任务,就肯定会有取消一个任务的操作,是的,他就是uv__work_cancel,代码如下: 1 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { 2 int cancelled; 3 4 uv_mutex_lock(&mutex); 5 uv_mutex_lock(&w->loop->wq_mutex); 6 7 // 只有当前队列不为空并且要取消的uv__work有效时才会继续执行 8 cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL; 9 if (cancelled) 10 QUEUE_REMOVE(&w->wq);// 从队列中移除task 11 12 uv_mutex_unlock(&w->loop->wq_mutex); 13 uv_mutex_unlock(&mutex); 14 15 if (!cancelled) 16 return UV_EBUSY; 17 18 // 更新这个task的状态 19 w->work = uv__cancelled; 20 uv_mutex_lock(&loop->wq_mutex); 21 QUEUE_INSERT_TAIL(&loop->wq, &w->wq); 22 uv_async_send(&loop->wq_async); 23 uv_mutex_unlock(&loop->wq_mutex); 24 25 return 0; 26 } 至此,一个线程池的组成以及实现原理都说完了,可以看到,libuv几乎是用了最少的代码完成了高效的线程池,这对于我们平时写代码时具有很好的借鉴意义线程池linux,文中涉及到uv_req_t以及uv_loop_t等结构我都直接跳过,因为这牵扯到libuv的其他组件,我将在以后的源码剖析中逐步阐述,谢谢你能看到这里。 猜您喜欢: (编辑:海南站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |