标签:定时任务 cal 没有 标准 timer rect 记录 阻塞队列 def
好的,TinyWebServer我们讲了八个模块中的5个,还剩下数据库mysql模块,定时器timer模块,日记log模块。
(更新中~~~~~~)
mysql模块
项目中有简单的注册和登录功能,所以要使用到数据库。那么mysql模块就是数据库相关的模块,主要的其实就是数据库连接池。
首先数据库连接池是只有一个的,那么怎么保证从项目的每一个地方获取都是这个唯一的一个数据库连接池呢?欸,想到什么了?单例模式。在这里我们使用Cpp11下简洁的静态局部变量实现单例模式:
connection_pool* connection_pool::GetInstance() { static connection_pool connPool; return &connPool; }
原理是是C++11标准规定了当一个线程正在初始化一个变量的时候,其他线程必须得等到该初始化完成以后才能访问它。
OK我们上面解决了池子的问题,接下来我们考虑到,WebServer要有一定的并发度,所以我们要有多个数据库连接资源放在数据库连接池,当任务线程需要数据库连接的时候就向池子申请一个。好的,那么我们便有了一个问题:怎样保证数据库连接的线程安全?
我们先得有一个保存数据库连接的数据结构list,当然我们得先保证池子(或者说list的安全)的线程安全,所以有一个池子的互斥锁lock,然后我们保证list中连接资源的安全,所以再有一个信号量reserve,用于管理池子的空闲连接数。
有了这两个池子接下来的就是比较常规的做法了:在获取/归还数据库连接资源前先用互斥锁对池子加锁,然后用信号量保证list空闲连接资源数。
1 #include<stdio.h> 2 #include<mysql/mysql.h> 3 #include<string.h> 4 #include<list> 5 #include<pthread.h> 6 #include<iostream> 7 8 #include"sql_connection_pool.h" 9 10 using namespace std; 11 12 //构造函数 13 connection_pool::connection_pool() { 14 this->CurConn = 0; 15 this->FreeConn = 0; 16 this->MaxConn = 0; 17 } 18 19 //单例模式,静态 20 connection_pool* connection_pool::GetInstance() { 21 static connection_pool connPool; 22 return &connPool; 23 } 24 25 //真正的初始化函数 26 void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, unsigned int MaxConn) { 27 this->url = url; 28 this->Port = Port; 29 this->User = User; 30 this->PassWord = PassWord; 31 this->DatabaseName = DBName; 32 33 //先互斥锁锁住池子,创造MaxConn数据库链接 34 lock.lock(); 35 for (int i = 0; i < MaxConn; i++) { 36 //新创建一个连接资源 37 MYSQL* con = NULL; 38 con = mysql_init(con); 39 40 if (con == NULL) { 41 cout << "mysqlinit Error:" << mysql_error(con)<<endl; 42 exit(1); 43 } 44 45 con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0); 46 if (con == NULL) { 47 cout << "mysql connect Error:" << mysql_error(con) << endl; 48 exit(1); 49 } 50 51 //把这个资源放入链表 52 connList.push_back(con); 53 ++FreeConn; 54 } 55 56 //初始化信号量和池子数量 57 reserve = sem(FreeConn); 58 this->MaxConn = FreeConn; 59 60 lock.unlock(); 61 } 62 63 //请求获取一个连接资源 64 MYSQL* connection_pool::GetConnection() { 65 MYSQL* con = NULL; 66 67 if (connList.size() == 0) return NULL; 68 69 //请求一个资源,互斥锁/信号量 准备 70 reserve.wait(); 71 lock.lock(); 72 73 con = connList.front(); //从链表头取得一个资源 74 connList.pop_front(); 75 76 --FreeConn; 77 ++CurConn; 78 79 lock.unlock(); 80 return con; 81 } 82 83 //获取空闲连接数 84 int connection_pool::GetFreeConn() { 85 return this->FreeConn; 86 } 87 88 //释放当前连接资源con 89 bool connection_pool::ReleaseConnection(MYSQL* con) { 90 if (con == NULL) return false; 91 lock.lock(); 92 93 connList.push_back(con); 94 ++FreeConn; 95 --CurConn; 96 97 reserve.post(); 98 lock.unlock(); 99 100 return true; 101 } 102 103 //析构函数 104 connection_pool::~connection_pool() { 105 DestroyPool(); 106 } 107 108 //销毁整个数据库连接池 109 void connection_pool::DestroyPool() { 110 lock.lock(); 111 if (connList.size() > 0) { 112 list<MYSQL*>::iterator it; 113 for (it = connList.begin(); it != connList.end(); it++) { 114 MYSQL* con = *it; 115 mysql_close(con); //获得每一个连接资源close掉 116 } 117 CurConn = 0; 118 FreeConn = 0; 119 connList.clear(); 120 } 121 lock.unlock(); 122 } 123 124 125 //连接池RAII 126 connectionRAII::connectionRAII(MYSQL** SQL, connection_pool* connPool) { 127 *SQL = connPool->GetConnection(); 128 conRAII = *SQL; 129 poolRAII = connPool; 130 } 131 connectionRAII::~connectionRAII() { 132 poolRAII->ReleaseConnection(conRAII); 133 }
timer模块
定时器模块的功能是定时检查长时间无反应的连接,如果有服务器这边就主动断开连接。那么我们怎样做到这一点呢?博主个人感觉有两个关键问题:
① 定时器事件应该以一种怎样的方式去触发
这个问题其实很有意思,通常我们以前学习到处理信号的方式是把信号发生之后的要处理的逻辑全部放在信号的回调函数中。在这时候我们也许忽略了一个事实:在Linux环境下当我们回调一个信号的回调函数时候这段时间系统会忽略至少这个同样的信号(这是当然的不然就有可能死循环等出错),那么我们为了不让这些被忽略的信号被忽略太久,我们得想尽办法尽量缩短这个回调函数的执行时间。那么怎样才能做到这样呢?
一个理所当然的思路是:把回调函数的逻辑搬到主函数执行。那么怎样做到这一点:统一事件源。原理很简单,这时我们的信号回调函数不要处理逻辑,而是在回调函数中通过管道给主函数发送信息,那么当主函数监听到读时间并且判断到是从管道读端来的,那就知道这个信号到了我主函数应该处理了。
② 定时器以及应该以怎么样的数据结构来保存
在游双的《高性能服务器编程》这本书里面写到三种定时器的存储结构:链表、时间轮、时间堆。这个TinyWebServer使用的是最好实现的链表定时器。
我们有一个定时器结点类util_timer,每个结点表示一个客户连接,它保存了双向链表的前后指针,客户数据client_data和回调函数。如果我们判断到这个结点长时间无反应,所以我们调用这个回调函数传入client_data,然后回调函数就会把这个客户断开,并且做一些善后工作。
我们还有链表类sort_timer_lst,这个链表是一个时间递增的结点链表,即从链表头到尾这个客户的最后一次反应时间是递增的。这个链表类当然有插入和删除结点函数。并且还有adjust_timer调整链表位置函数,作用是当一个客户有了反应,那么我们需要更新他的最后一次反应时间,那么为了维护链表的递增特性,我们需要这么一个调整位置的函数。此外,这个类还有一个检查函数(定时清扫),作用是我们上文提到统一了事件源,把信号回调函数逻辑搬到主函数执行,所以这个定时清扫检查逻辑就是在这个检查函数。主函数判断到信号来了,就执行这个函数进行检查链表中长时间无反应的结点进行清扫。
1 #ifndef LST_TIMER 2 #define LST_TIMER 3 4 #include<time.h> 5 #include<netinet/in.h> 6 7 class util_timer; 8 struct client_data 9 { 10 sockaddr_in address; 11 int sockfd; 12 util_timer* timer; //客户对应的定时器,和25行相互 13 }; 14 15 //链表结点,包含事件和客户数据 16 class util_timer 17 { 18 public: 19 util_timer() : prev(NULL),next(NULL) {} 20 21 public: 22 time_t expire; //记录时间 23 //!!!定时器的执行函数,到时间就调用这个 24 void (*cb_func)(client_data*); 25 client_data* user_data; //客户数据,和12行相互 26 util_timer* prev; //双向链表 27 util_timer* next; //双向链表 28 }; 29 30 31 //链表,按事件升序排序 32 class sort_timer_lst 33 { 34 public: 35 //链表的构造与析构函数 36 sort_timer_lst() : head(NULL), tail(NULL) {}; 37 ~sort_timer_lst() { 38 util_timer* tmp = head; 39 while (tmp) { 40 head = tmp->next; 41 delete tmp; 42 tmp = head; 43 } 44 } 45 //插入结点 46 void add_timer(util_timer* timer) { 47 if (!timer) return; 48 if (!head) { 49 head = tail = timer; 50 return; 51 } 52 //如果新的定时器超时时间小于当前头部结点 53 //直接将当前定时器结点作为头部结点 54 if (timer->expire < head->expire) { 55 timer->next = head; 56 head->prev = timer; 57 head = timer; 58 return; 59 } 60 61 //至少不是插入到头,调用函数继续插入 62 add_timer(timer, head); 63 } 64 //调整定时器,任务发生变化时,调整定时器在链表中的位置 65 void adjust_timer(util_timer* timer) { 66 if (!timer) return; 67 util_timer* tmp = timer->next; 68 69 //因为只会增加,所以如果在最后肯定无需调整 70 if (!tmp || (timer->expire < tmp->expire)) return; 71 72 //分两种情况:头/非头。思路都是先删除,再调用插入函数重新插入 73 if (timer == head) { 74 head = head->next; 75 head->prev = NULL; 76 timer->next = NULL; 77 add_timer(timer, head); 78 } 79 else { 80 timer->prev->next = timer->next; 81 timer->next->prev = timer->prev; 82 add_timer(timer, timer->next); 83 } 84 } 85 //删除定时器 86 void del_timer(util_timer* timer) { 87 if (!timer) return; 88 //即整个链表就剩下一个结点,直接删除 89 if (timer == head && timer == tail) { 90 delete timer; 91 head = NULL; 92 tail = NULL; 93 return; 94 } 95 //被删除的定时器为头结点 96 if (timer == head) { 97 head = head->next; 98 head->prev = NULL; 99 delete timer; 100 return; 101 } 102 //被删除的是尾结点 103 if (timer == tail) { 104 tail = tail->prev; 105 tail->next = NULL; 106 delete timer; 107 return; 108 } 109 //不是头尾,普通删除 110 timer->prev->next = timer->next; 111 timer->next->prev = timer->prev; 112 delete timer; 113 return; 114 } 115 //定时任务处理函数 116 void tick() { 117 if (!head) return; 118 time_t cur = time(NULL); //获取当前时间 119 util_timer* tmp = head; 120 while (tmp) 121 { 122 if (cur < tmp->expire) break; //就到这里了,后面的执行时间都还没到 123 tmp->cb_func(tmp->user_data); //满足超市条件,调用cb_func删除连接 124 //执行完之后,删除链表头并移动头 125 head = tmp->next; 126 if (head) 127 head->prev = NULL; 128 delete tmp; 129 tmp = head; 130 } 131 } 132 133 private: 134 //把timer插入到链表中,经过上面的检测到这里至少不是插入到头 135 void add_timer(util_timer* timer, util_timer* lst_head) { 136 util_timer* prev = lst_head; 137 util_timer* tmp = prev->next; 138 //遍历当前结点之后的链表,按照超时时间找到目标定时器对应的位置,常规双向链表插入操作 139 while (tmp) 140 { 141 //插入到prev后,tmp之前 142 if (timer->expire < tmp->expire) { 143 prev->next = timer; 144 timer->next = tmp; 145 tmp->prev = timer; 146 timer->prev = prev; 147 break; 148 } 149 prev = tmp; 150 tmp = tmp->next; 151 } 152 //上面没有插入成功,证明要插入到最后面 153 if (!tmp) { 154 prev->next = timer; 155 timer->prev = prev; 156 timer->next = NULL; 157 tail = timer; 158 } 159 } 160 161 private: 162 util_timer* head; 163 util_timer* tail; 164 }; 165 166 167 #endif
log模块
log是日志模块,一个合格的服务器当然少不了日志来记录错误异常等等信息。我们想设计一个日志模块,他能顺利写日志但是又不要占用主线程时间去写,所以我们设计异步写日志的模块。
怎么是异步写日志呢?我们考虑设计一个日志队列,这个队列主要是用一个循环数组模拟队列来存储日志,这里要注意这个队列只是存储我们真正的目的是要写到文件里,所以只是存储并未达到目的。但是考虑到文件IO操作是比较慢的,所以我们采用的异步IO就是先写到内存里,然后日志线程自己有空的时候写到文件里。
所以这一模块的关键就是日志队列和写日志的线程。
先来思考日志队列,他的需求就是时不时会有一段日志塞到这个队列中,又时不时会有这其中的一段日志被取出来,那么当然是队列不满才能往里塞,队列不空才能有东西取出来。稍加思考这是什么?欸,就是经典的生产者消费者模型。所以也就没什么好说的了,常规处理:要一个互斥锁和信号量,操作前都加锁就行。
1 #ifndef BLOCK_QUEUE_H 2 #define BLOCK_QUEUE_H 3 4 #include <iostream> 5 #include <stdlib.h> 6 #include <pthread.h> 7 #include <sys/time.h> 8 #include "../lock/locker.h" 9 using namespace std; 10 11 template <class T> 12 class block_queue 13 { 14 public: 15 block_queue(int max_size = 1000) 16 { 17 if (max_size <= 0) 18 { 19 exit(-1); 20 } 21 22 m_max_size = max_size; 23 m_array = new T[max_size]; 24 m_size = 0; 25 m_front = -1; 26 m_back = -1; 27 } 28 29 void clear() 30 { 31 m_mutex.lock(); 32 m_size = 0; 33 m_front = -1; 34 m_back = -1; 35 m_mutex.unlock(); 36 } 37 38 ~block_queue() 39 { 40 m_mutex.lock(); 41 if (m_array != NULL) 42 delete[] m_array; 43 44 m_mutex.unlock(); 45 } 46 //判断队列是否满了 47 bool full() 48 { 49 m_mutex.lock(); 50 if (m_size >= m_max_size) 51 { 52 53 m_mutex.unlock(); 54 return true; 55 } 56 m_mutex.unlock(); 57 return false; 58 } 59 //判断队列是否为空 60 bool empty() 61 { 62 m_mutex.lock(); 63 if (0 == m_size) 64 { 65 m_mutex.unlock(); 66 return true; 67 } 68 m_mutex.unlock(); 69 return false; 70 } 71 //返回队首元素 72 bool front(T& value) 73 { 74 m_mutex.lock(); 75 if (0 == m_size) 76 { 77 m_mutex.unlock(); 78 return false; 79 } 80 value = m_array[m_front]; 81 m_mutex.unlock(); 82 return true; 83 } 84 //返回队尾元素 85 bool back(T& value) 86 { 87 m_mutex.lock(); 88 if (0 == m_size) 89 { 90 m_mutex.unlock(); 91 return false; 92 } 93 value = m_array[m_back]; 94 m_mutex.unlock(); 95 return true; 96 } 97 98 int size() 99 { 100 int tmp = 0; 101 102 m_mutex.lock(); 103 tmp = m_size; 104 105 m_mutex.unlock(); 106 return tmp; 107 } 108 109 int max_size() 110 { 111 int tmp = 0; 112 113 m_mutex.lock(); 114 tmp = m_max_size; 115 116 m_mutex.unlock(); 117 return tmp; 118 } 119 //往队列添加元素,需要将所有使用队列的线程先唤醒 120 //当有元素push进队列,相当于生产者生产了一个元素 121 //若当前没有线程等待条件变量,则唤醒无意义 122 bool push(const T& item) 123 { 124 125 m_mutex.lock(); 126 if (m_size >= m_max_size) 127 { 128 129 m_cond.broadcast(); 130 m_mutex.unlock(); 131 return false; 132 } 133 134 m_back = (m_back + 1) % m_max_size; 135 m_array[m_back] = item; 136 137 m_size++; 138 139 m_cond.broadcast(); 140 m_mutex.unlock(); 141 return true; 142 } 143 //pop时,如果当前队列没有元素,将会等待条件变量 144 bool pop(T& item) 145 { 146 147 m_mutex.lock(); 148 while (m_size <= 0) 149 { 150 151 if (!m_cond.wait(m_mutex.get())) 152 { 153 m_mutex.unlock(); 154 return false; 155 } 156 } 157 158 m_front = (m_front + 1) % m_max_size; 159 item = m_array[m_front]; 160 m_size--; 161 m_mutex.unlock(); 162 return true; 163 } 164 165 //增加了超时处理 166 bool pop(T& item, int ms_timeout) 167 { 168 struct timespec t = { 0, 0 }; 169 struct timeval now = { 0, 0 }; 170 gettimeofday(&now, NULL); 171 m_mutex.lock(); 172 if (m_size <= 0) 173 { 174 t.tv_sec = now.tv_sec + ms_timeout / 1000; 175 t.tv_nsec = (ms_timeout % 1000) * 1000; 176 if (!m_cond.timewait(m_mutex.get(), t)) 177 { 178 m_mutex.unlock(); 179 return false; 180 } 181 } 182 183 if (m_size <= 0) 184 { 185 m_mutex.unlock(); 186 return false; 187 } 188 189 m_front = (m_front + 1) % m_max_size; 190 item = m_array[m_front]; 191 m_size--; 192 m_mutex.unlock(); 193 return true; 194 } 195 196 private: 197 locker m_mutex; 198 cond m_cond; 199 200 T* m_array; 201 int m_size; 202 int m_max_size; 203 int m_front; 204 int m_back; 205 }; 206 207 #endif
那么剩下的就是写日志线程,这一部分也比较简单就是新建一个线程,这个线程不断while当日志队列有日志就从里面取出来写到文件去,这个过程记得加锁就行。
1 #include <string.h> 2 #include <time.h> 3 #include <sys/time.h> 4 #include <stdarg.h> 5 #include "log.h" 6 #include <pthread.h> 7 using namespace std; 8 9 Log::Log() 10 { 11 m_count = 0; 12 m_is_async = false; 13 } 14 15 Log::~Log() 16 { 17 if (m_fp != NULL) 18 { 19 fclose(m_fp); 20 } 21 } 22 //异步需要设置阻塞队列的长度,同步不需要设置 23 bool Log::init(const char *file_name, int log_buf_size, int split_lines, int max_queue_size) 24 { 25 //如果设置了max_queue_size,则设置为异步 26 if (max_queue_size >= 1) 27 { 28 m_is_async = true; 29 m_log_queue = new block_queue<string>(max_queue_size); 30 pthread_t tid; 31 //flush_log_thread为回调函数,这里表示创建线程异步写日志 32 pthread_create(&tid, NULL, flush_log_thread, NULL); 33 } 34 35 m_log_buf_size = log_buf_size; 36 m_buf = new char[m_log_buf_size]; 37 memset(m_buf, ‘\0‘, m_log_buf_size); 38 m_split_lines = split_lines; 39 40 time_t t = time(NULL); 41 struct tm *sys_tm = localtime(&t); 42 struct tm my_tm = *sys_tm; 43 44 45 const char *p = strrchr(file_name, ‘/‘); 46 char log_full_name[256] = {0}; 47 48 if (p == NULL) 49 { 50 snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name); 51 } 52 else 53 { 54 strcpy(log_name, p + 1); 55 strncpy(dir_name, file_name, p - file_name + 1); 56 snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name); 57 } 58 59 m_today = my_tm.tm_mday; 60 61 m_fp = fopen(log_full_name, "a"); 62 if (m_fp == NULL) 63 { 64 return false; 65 } 66 67 return true; 68 } 69 70 void Log::write_log(int level, const char *format, ...) 71 { 72 struct timeval now = {0, 0}; 73 gettimeofday(&now, NULL); 74 time_t t = now.tv_sec; 75 struct tm *sys_tm = localtime(&t); 76 struct tm my_tm = *sys_tm; 77 char s[16] = {0}; 78 switch (level) 79 { 80 case 0: 81 strcpy(s, "[debug]:"); 82 break; 83 case 1: 84 strcpy(s, "[info]:"); 85 break; 86 case 2: 87 strcpy(s, "[warn]:"); 88 break; 89 case 3: 90 strcpy(s, "[erro]:"); 91 break; 92 default: 93 strcpy(s, "[info]:"); 94 break; 95 } 96 //写入一个log,对m_count++, m_split_lines最大行数 97 m_mutex.lock(); 98 m_count++; 99 100 if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0) //everyday log 101 { 102 103 char new_log[256] = {0}; 104 fflush(m_fp); 105 fclose(m_fp); 106 char tail[16] = {0}; 107 108 snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday); 109 110 if (m_today != my_tm.tm_mday) 111 { 112 snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name); 113 m_today = my_tm.tm_mday; 114 m_count = 0; 115 } 116 else 117 { 118 snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines); 119 } 120 m_fp = fopen(new_log, "a"); 121 } 122 123 m_mutex.unlock(); 124 125 va_list valst; 126 va_start(valst, format); 127 128 string log_str; 129 m_mutex.lock(); 130 131 //写入的具体时间内容格式 132 int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ", 133 my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, 134 my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s); 135 136 int m = vsnprintf(m_buf + n, m_log_buf_size - 1, format, valst); 137 m_buf[n + m] = ‘\n‘; 138 m_buf[n + m + 1] = ‘\0‘; 139 log_str = m_buf; 140 141 m_mutex.unlock(); 142 143 if (m_is_async && !m_log_queue->full()) 144 { 145 m_log_queue->push(log_str); 146 } 147 else 148 { 149 m_mutex.lock(); 150 fputs(log_str.c_str(), m_fp); 151 m_mutex.unlock(); 152 } 153 154 va_end(valst); 155 } 156 157 void Log::flush(void) 158 { 159 m_mutex.lock(); 160 //强制刷新写入流缓冲区 161 fflush(m_fp); 162 m_mutex.unlock(); 163 }
日志模块本身不难理解,其实难理解的是写日志函数中的各种宏以及文件/字符串函数的灵活应用。
参考资料:
TinyWebServer项目地址:https://github.com/qinguoyi/TinyWebServer
单例模:https://light-city.club/sc/design_pattern/singleton/singleton/
TinyWebServer:一个Linux下C++轻量级Web服务器(中)
标签:定时任务 cal 没有 标准 timer rect 记录 阻塞队列 def
原文地址:https://www.cnblogs.com/clno1/p/13099665.html