标签:方式 RKE str 决定 创建 部分 cep 负载均衡 存储
libevent并不是线程安全的,但这不代表libevent不支持多线程模式。
前几天在微博上看到ruanyf发了条微博说到apache和nginx的并发模型,看到评论很多人都说不对
于是自己又查了下,总结一下我所学过的网络库或者网络服务器的并发模型
1、muduo:one loop per thread,主线程注册listen事件,通过某种负载均衡机制(round robin)将连接的事件注册到子线程的Reactor上,据说也是Netty的方案,最近也正好在学netty,刚好可以验证一下。
另外,muduo还提到了一个runInLoop()的功能:如果用户在当前线程调用,则回调functor会同步进行,如果在其他线程调用,则IO线程会被唤醒执行这个functor。这种跨线程调用是如何实现的?因为其他线程很可能阻塞在Reactor上。传统方法是用pipe(后面将提到memcache的多线程),在muduo里面是用eventfde(2),将回调放入线程的任务队列,并发送一个uint64大小的消息来唤醒Reactor。
2、nginx:master + worker的工作模式,ruanyf的微博说是master接受连接分配给worker,事实上不是这样的,master只是通过信号管理worker进程,worker之间通过accept_mutex来决定是否将监听套接字加入到loop中。所谓的one loop per process
3、libevent:这是在网上找的资料,libevent并不是线程安全,但不代表其不支持多线程。memcache的网络部分使用libevent,有一个经典的图描述了其多线程实现:
这里写图片描述
这种消息通知+同步层的机制,通过pipe和一个加锁的任务队列(CQ)实现。与muduo的eventfd效果类似。
一 线程的初始化
1线程对象
在进行事件驱动时,每个线程需建立自己的事件根基。由于libevent未提供线程之间通信的方式,我们采用管道来进行线程的通信。同时为方便主线程分配线程,我们还需保留各个线程的id号。因此我们采用如下结构来保留每个线程的有关信息。
typedef struct {
pthread_t thread_id; #线程ID
struct event_base *base; #事件根基
struct event notify_event;
int notify_receive_fd;
int notify_send_fd;
CQ new_conn_queue; #连接队列
} LIBEVENT_THREAD;
2 初始化线程
我们先介绍如何为每个线程创建自己的事件根基。正如前面介绍的我们需自己建立管道来进行线程通信,因此在线程根基初始化后,我们为为管道的读添加事件对象。注意:在libevent中,线程的初始化需在事件根基初始化之后(即event_init之后)。
static void setup_thread(LIBEVENT_THREAD *me) {
if (! me->base)
me->base = event_init();
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
event_add(&me->notify_event, 0) == -1)
cq_init(&me->new_conn_queue);
}
将主线程存储于结构体的第0个对象中,然后为每个线程创建管道并创建事件根基。
threads = malloc(sizeof(LIBEVENT_THREAD) * nthreads);
threads[0].base = main_base;
threads[0].thread_id = pthread_self();
for (i = 0; i < nthreads; i++) {
int fds[2];
pipe(fds)
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
setup_thread(&threads[i]);
接下来从主线程中创建线程,线程创建成功后激活该线程的事件根基,进入事件循环。
for (i = 1; i < nthreads; i++) {
pthread_t thread;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_create(&thread, &attr, worker_libevent, &threads[i])
}
/* 激活事件根基.*/
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
return (void*) event_base_loop(me->base, 0);
}
二 分发连接
现在我们有了多个线程和事件根基,那么我们应该如何将主线程接收到的连接分发给各个线程并将其激活呢?本例子中线程的选择采用最简单的轮询方式。
我们需要对accept_handle函数进行修改。在线程池模型中,我们使用子线程代替主线程来创建缓冲区事件对象,主线程只是对激活选中的线程。我们通过向管道写入一个空字节来激活该管道的读请求。
thread += 1;
item->sfd = sfd
cq_push(&threads[thread].new_conn_queue, item);
write(threads[thread].notify_send_fd, "", 1)
三 处理请求
管道的读请求就绪后,回调函数thread_libevent_process被调用,此时就进入到了线程中,后面所有的调度都将在该线程中进行。
首先从管道中读取1个字节,然后创建事件缓冲区对象来实际处理请求。
static void thread_libevent_process(int fd, short which, void *arg) {
char buf[1];
read(fd, buf, 1)
…
}
四结束语
到这里我们的程序就可以使用线程池和libevent事件驱动来协同工作了。在实际的服务中,我们通常要将服务作为守护进程来运行,那么在linux中,如何编写守护进程呢?后面在继续学习如何编写linux守护进程。
标签:方式 RKE str 决定 创建 部分 cep 负载均衡 存储
原文地址:https://www.cnblogs.com/cnhk19/p/14548945.html