标签:
消息调度在框架中分为两个层次,一个c层的分配,一个是lua层的分发。本文阐述的是c层,从两个方面来说:
与调度相关的代码实现在/skynet-src/skynet_mq.c,/skynet-src/skynet_start.c,/skynet-src/skynet_server.c三个文件中,整体上是一个m:n的调度器。
工作线程的控制
框架运行后,会启动固定的线程来轮流调度sc(skynet_context),线程数由配置文件中的thread字段定义,默认是4个。那框架中是如何控制这些线程的呢?具体实现在/skynet-src/skynet_start.c中。
在208行,启动了工作线程:
static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
struct worker_parm wp[thread];
for (i=0;i<thread;i++) {
wp[i].m = m;
wp[i].id = i;
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}
create_thread(&pid[i+3], thread_worker, &wp[i]);
}
直接来看线程函数thread_worker把,在152行:
1 static void *
2 thread_worker(void *p) {
3 struct worker_parm *wp = p;
4 int id = wp->id;
5 int weight = wp->weight;
6 struct monitor *m = wp->m;
7 struct skynet_monitor *sm = m->m[id];
8 skynet_initthread(THREAD_WORKER);
9 struct message_queue * q = NULL;
10 while (!m->quit) {
11 q = skynet_context_message_dispatch(sm, q, weight);
12 if (q == NULL) {
13 if (pthread_mutex_lock(&m->mutex) == 0) {
14 ++ m->sleep;
15 // "spurious wakeup" is harmless,
16 // because skynet_context_message_dispatch() can be call at any time.
17 if (!m->quit)
18 pthread_cond_wait(&m->cond, &m->mutex);
19 -- m->sleep;
20 if (pthread_mutex_unlock(&m->mutex)) {
21 fprintf(stderr, "unlock mutex error");
22 exit(1);
23 }
24 }
25 }
26 }
27 return NULL;
28 }
控制这种生命周期与进程一致的工作线程,主要有两个细节:1、均匀不重复的分配任务。2、不空转、最小时延。前者处理线程同步就好。来看看skynet是如何处理后者的吧:
它用得是条件变量来处理空转的,用条件变量有两点好处:1、让出cpu时间片.2、由外部决定何时唤醒,这样可以在有任务时再唤醒,既能最大化的不空转,又能减小处理任务的时延。
具体实现是条件变量的标准应用了,和《unix高级编程》条件变量的例子几乎一样。这里还有一个sleep的计数,有什么用呢?用来判断要不要调用pthread_cond_signal的。
最后还有一个问题,等待的线程是在哪里被唤醒的呢?在socket线程和timer线程里唤醒的,前者有socket消息时会调用一次,后者每个刷新时间会唤醒一次。
信箱的调度
上一篇时,在sc里我们看到过一个message_queue类型的字段,这就是信箱。skynet中用了两种队列来存储消息并完成调度,下面称为12级队列,1级队列是一个单链表,每个节点是2级队列,2级队列(message_queue)是一个自动扩展的循环队列,用来存储消息。这两个队列实现在/skynet-src/skynet_mq.c中,实现的很简单,并没有用复杂的无锁结构,而是自旋锁保证线程安全的链表,循环队列。
信箱的调度就是12级队列的调度,整体结构描述如下:
while(1){
1级队列出队;
调度2级队列;
1级队列入队;
}
这部分实现在/skynet-src/skynet_server的275行skynet_context_message_dispatch()中:
1 struct message_queue *
2 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
3 if (q == NULL) {
4 q = skynet_globalmq_pop();
5 if (q==NULL)
6 return NULL;
7 }
8
9 uint32_t handle = skynet_mq_handle(q);
10
11 struct skynet_context * ctx = skynet_handle_grab(handle);
12 if (ctx == NULL) {
13 struct drop_t d = { handle };
14 skynet_mq_release(q, drop_message, &d);
15 return skynet_globalmq_pop();
16 }
17
18 int i,n=1;
19 struct skynet_message msg;
20
21 for (i=0;i<n;i++) {
22 if (skynet_mq_pop(q,&msg)) {
23 skynet_context_release(ctx);
24 return skynet_globalmq_pop();
25 } else if (i==0 && weight >= 0) {
26 n = skynet_mq_length(q);
27 n >>= weight;
28 }
29 int overload = skynet_mq_overload(q);
30 if (overload) {
31 skynet_error(ctx, "May overload, message queue length = %d", overload);
32 }
33
34 skynet_monitor_trigger(sm, msg.source , handle);
35
36 if (ctx->cb == NULL) {
37 skynet_free(msg.data);
38 } else {
39 dispatch_message(ctx, &msg);
40 }
41
42 skynet_monitor_trigger(sm, 0,0);
43 }
44
45 assert(q == ctx->queue);
46 struct message_queue *nq = skynet_globalmq_pop();
47 if (nq) {
48 // If global mq is not empty , push q back, and return next queue (nq)
49 // Else (global mq is empty or block, don‘t push q back, and return q again (for next dispatch)
50 skynet_globalmq_push(q);
51 q = nq;
52 }
53 skynet_context_release(ctx);
54
55 return q;
56 }
这个函数的作用是,调度传入的2级队列,并返回下一个可调度的2级队列。在上面的实现中,有四个细节之处:
1、22-24行,当2级队列为空时并没有将其压入1级队列,那它从此就消失了吗?不,这样做是为了减少空转1级队列,那这个2级队列是什么时候压回的呢?在message_queue中,有一个
in_global标记是否在1级队列中,当2级队列的出队(skynet_mq_pop)失败时,这个标记就会被置0,在2级队列入队时(skynet_mq_push)会判断这个标记,如果为0,那么就会将自己压入1级队列。(skynet_mq_mark_release也会判断)所以这个2级队列在下次入队时会压回。
2、25-27,修改了for循环的次数,也就是每次调度处理多少条消息。这个次数与传入的weight有关,我们回过头来看这个weight是从哪里来的,源头在工作线程创建时:
static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
struct worker_parm wp[thread];
for (i=0;i<thread;i++) {
wp[i].m = m;
wp[i].id = i;
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}
create_thread(&pid[i+3], thread_worker, &wp[i]);
}
再来看看 n >>= weight,嗯,大致就是:把工作线程分为组,前四组每组8个,超过的归入第5组,AE组每次调度处理一条消息,B组每次处理(n/2)条,C组每次处理(n/4)条,D组每次处理(n/8)条。是为了均匀的使用多核。
3、29-32做了一个负载判断,负载的阀值是1024。不过也仅仅是输出一条log提醒一下而以.
4、34、42触发了一下monitor,这个监控是用来检测消息处理是否发生了死循环,不过也仅仅只是输出一条log提醒一下。这个检测是放在一个专门的监控线程里做的,判断死循环的时间是5秒。具体机制这里就不说了,其实现在/skynet-src/skynet_monitor.c中
标签:
原文地址:http://www.cnblogs.com/watercoldyi/p/5869298.html