码迷,mamicode.com
首页 > 其他好文 > 详细

thread

时间:2015-07-27 12:34:09      阅读:98      评论:0      收藏:0      [点我收藏+]

标签:

/*
* File: Runable.h
* Author: raozf
*
* Created on 2013年3月19日, 上午11:08
*/

#ifndef RUNABLE_H
#define RUNABLE_H

class Runable
{
protected:
bool m_bExit;

public:

Runable() : m_bExit(false) { }

virtual void Run() = 0;

void Stop()
{
m_bExit = true;
}
};

#endif /* RUNABLE_H */

 

 

/*
* File: TaskQueue.h
* Author: chu
*
* Created on 2013年3月9日, 下午4:53
*/

#ifndef TASKQUEUE_H
#define TASKQUEUE_H

#include "common.h"

//class TaskQueue
//{
//private:
// int m_maxfds;
// std::queue<int> m_fds;
// pthread_mutex_t m_mutex;
// pthread_cond_t m_condition;
//public:
//
// TaskQueue(int nMaxfds = -1) :
// m_maxfds(nMaxfds),
// m_mutex(PTHREAD_MUTEX_INITIALIZER),
// m_condition(PTHREAD_COND_INITIALIZER) { }
//
// void Push(int fd)
// {
// pthread_mutex_lock(&m_mutex);
// if (m_maxfds > 0 && m_fds.size() >= m_maxfds)//connection is full. Reject it!
// {
// LOG(LOG_LEVEL_WARN, "************ TaskQueue full! ************ m_maxfds = " << m_maxfds << ", size = " << m_fds.size() << ", fd = " << fd);
// close(fd); //*********************
//
// pthread_mutex_unlock(&m_mutex);
// return;
// }
//
// m_fds.push(fd);
// pthread_cond_signal(&m_condition);
// pthread_mutex_unlock(&m_mutex);
// }
//
// //will block if no elements
// int Pop()
// {
// int fd = -1;
// pthread_mutex_lock(&m_mutex);
//
// //no elements, need to wait.
// if (m_fds.size() <= 0)
// {
// //shall be called with mutex locked by the calling thread or undefined behavior results.
// //thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)函数传入的参数mutex用于保护条件,
// //因为我们在调用pthread_cond_wait时,如果条件不成立我们就进入阻塞,但是进入阻塞这个期间,如果条件变量改变了的话,
// //那我们就漏掉了这个条件。因为这个线程还没有放到等待队列上,所以调用pthread_cond_wait前要先锁互斥量,
// //即调用pthread_mutex_lock(),pthread_cond_wait在把线程放进阻塞队列后,自动对mutex进行解锁,
// //使得其它线程可以获得加锁的权利。这样其它线程才能对临界资源进行访问并在适当的时候唤醒这个阻塞的进程。
// //当pthread_cond_wait返回的时候又自动给mutex加锁,所以最后我们要手动解锁。
// pthread_cond_wait(&m_condition, &m_mutex);
// pthread_mutex_unlock(&m_mutex);
//
// //got singal, try to pop again.
// return Pop();
// }
//
// fd = m_fds.front();
// m_fds.pop();
// pthread_mutex_unlock(&m_mutex);
//
// return fd;
// }
//};

class TaskQueue
{
private:
int m_maxfds;
std::queue<int> m_fds;
boost::mutex m_mutex;
boost::condition m_condition;

//pthread_mutex_t m_mutex;
//pthread_cond_t m_condition;
public:
TaskQueue(int nMaxfds = -1) :
m_maxfds(nMaxfds){ }

void Push(int fd)
{
LOG(LOG_LEVEL_DEBUG, "Push. fd:"<<fd);
LOCK _l(m_mutex);
if (m_maxfds > 0 && m_fds.size() >= m_maxfds)//connection is full. Reject it!
{
LOG(LOG_LEVEL_WARN, "************ TaskQueue full! ************ m_maxfds = " << m_maxfds << ", size = " << m_fds.size() << ", fd = " << fd);
close(fd); //*********************

return;
}

m_fds.push(fd);
m_condition.notify_one();
LOG(LOG_LEVEL_DEBUG, "Push. finished");
}

//will block if no elements
int Pop()
{
LOG(LOG_LEVEL_DEBUG, "Pop()");
int fd = -1;
LOCK _l(m_mutex);

//no elements, need to wait.
if (m_fds.size() <= 0)
{
//LOG(LOG_LEVEL_DEBUG, "Pop. start wait....");
m_condition.wait(_l);
_l.unlock();
//LOG(LOG_LEVEL_DEBUG, "Pop. wait comes.");
return Pop();
}
else
{
fd = m_fds.front();
m_fds.pop();
//LOG(LOG_LEVEL_DEBUG, "PopXXXXXXXXXXXXXX2");
}

//LOG(LOG_LEVEL_DEBUG, "Pop(). fd:"<<fd);
return fd;
}

//timed block if no element
int Pop_Timed()
{
int fd = -1;
LOCK _l(m_mutex);

//no elements, need to wait.
if (m_fds.size() <= 0)
{
m_condition.timed_wait(_l, boost::get_system_time() + boost::posix_time::seconds(3));
_l.unlock();
}
else
{
fd = m_fds.front();
m_fds.pop();
}
return fd;
}
};

#endif /* TASKQUEUE_H */

 

 

 

 

 

 

 

 

 

 

 

 

/*
* File: SocketManager.h
* Author: raozf
*
* Created on 2013年3月11日, 下午5:32
*/

#ifndef SOCKETMANAGER_H
#define SOCKETMANAGER_H

#include "common.h"
#include "Runable.h"

class SocketManager : public Runable
{
private:
boost::unordered_map<int, time_t> m_sockets;
pthread_mutex_t m_mutex;
typedef boost::unordered_map<int, time_t>::iterator iterator;
public:

SocketManager() : Runable(),
m_mutex(PTHREAD_MUTEX_INITIALIZER) { }

void UpdateActiveTime(int sock_fd)
{
time_t _now = time(NULL);
pthread_mutex_lock(&m_mutex);
iterator it = m_sockets.find(sock_fd);
if (it == m_sockets.end())
{
m_sockets.insert(std::pair<int, time_t>(sock_fd, _now));
LOG(LOG_LEVEL_DEBUG, "[fd:" << sock_fd << "] inseret, time:" << _now);
}
else
{
it->second = _now;
LOG(LOG_LEVEL_DEBUG, "[fd:" << sock_fd << "] update , time:" << _now);
}

pthread_mutex_unlock(&m_mutex);
}

void Run()
{
while (m_bExit == false)
{
time_t _now = time(NULL);
pthread_mutex_lock(&m_mutex);

BOOST_FOREACH(iterator::value_type& it, m_sockets)
{
if (_now - it.second >= SOCK_INACTIVE_TIMEOUT)
{
close(it.first);
//LOG_NODE("[fd:"<< it.first<<"] inactive timeout, force closed. update time: "<<it.second);

//m_sockets.erase(it.first);
//unnecessary to remove it.
//the socket handle will be used for other sockets.
}
}
pthread_mutex_unlock(&m_mutex);

sleep(2);
}
}
};


#endif /* SOCKETMANAGER_H */

 

 

 

 

 

 

 

 

 

 

 

 

 

 

/*
* File: EpollThread.h
* Author: raozf
*
* Created on 2013年3月19日, 上午11:00
*/

#ifndef LISTENER_H
#define LISTENER_H

#include "common.h"
#include "Runable.h"

class Listener : public Runable
{
private:
int m_fd;
TaskQueue* m_TaskQueue;
SocketManager* m_SocketManager;
private:

Listener() : Runable(), m_fd(-1), m_TaskQueue(NULL), m_SocketManager(NULL) { }

bool SetNonBlocking(int fd)
{
int opts;
opts = fcntl(fd, F_GETFL);
if (opts >= 0)
{
opts = opts | O_NONBLOCK;
if (fcntl(fd, F_SETFL, opts) >= 0)
{
return true;
}
else
{
LOG(LOG_LEVEL_ERROR, "fcntl(fd, F_SETFL, opts) failed. opts = " << opts << ", fd = " << fd);
}
}
else
{
LOG(LOG_LEVEL_ERROR, "fcntl(fd, F_GETFL) returned:" << opts << ", fd = " << fd);
}

return false;
}

public:

static Listener* GetInstance()
{
static Listener Instance;
return &Instance;
}

bool Init(int port, TaskQueue* task_queue, SocketManager* socket_manager)
{
m_TaskQueue = task_queue;
m_SocketManager = socket_manager;
if ((task_queue == NULL) || (m_SocketManager == NULL))
{
LOG(LOG_LEVEL_FATAL, "Invalid paramers. m_TaskQueue:" << m_TaskQueue << ", m_SocketManager:" << m_SocketManager);
return false;
}

m_fd = socket(AF_INET, SOCK_STREAM, 0);
if (SetNonBlocking(m_fd) == true)
{
struct sockaddr_in server_addr;
bzero(&server_addr, sizeof (server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_family = INADDR_ANY;
if (bind(m_fd, (sockaddr*) & server_addr, sizeof (server_addr)) == 0)
{
if (listen(m_fd, LISTENQ) == 0)
{
return true;
}
else
{
close(m_fd);
LOG(LOG_LEVEL_FATAL, "listen() failed. " << strerror(errno));
}
}
else
{
close(m_fd);
LOG(LOG_LEVEL_FATAL, "bind() failed. " << strerror(errno));
}
}

return false;
}

virtual void Run()
{
LOG(LOG_LEVEL_NODE, "Listener::Run().");

/* The size is not the maximum size of the backing store but just a hint to the kernel about how to dimension internal structures.
* (Nowadays, size is ignored. Since Linux 2.6.8, the size argument is unused:The kernel dynamically sizes the required data structures
* without needing this initial hint.)*/
int ep_fd = epoll_create(256);
struct epoll_event ev, events[MAX_EVENTS];
ev.data.fd = m_fd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(ep_fd, EPOLL_CTL_ADD, m_fd, &ev);

int nfds = 0;
int conn_fd = -1;
int sock_fd = -1;
struct sockaddr_in client_addr;
socklen_t clilen = sizeof (client_addr);
memset(&client_addr, 0, sizeof (struct sockaddr_in));

while (m_bExit == false)
{
nfds = epoll_wait(ep_fd, events, MAX_EVENTS, 3000);
for (int i = 0; i < nfds; i++)
{
if (events[i].data.fd == m_fd)
{
conn_fd = accept(m_fd, (sockaddr*) & client_addr, &clilen);
if (conn_fd < 0)
{
LOG(LOG_LEVEL_ERROR, "accept() returned fd:" << conn_fd << ". error msg:" << strerror(errno));
continue;
}
if (SetNonBlocking(conn_fd) == false)
{
LOG(LOG_LEVEL_WARN, "[fd:" << conn_fd << "] SetNonBlocking() failed. close socket.");
close(conn_fd);
continue;
}
m_SocketManager->UpdateActiveTime(conn_fd);

//The string is returned in a statically allocated buffer, which subsequent calls will overwrite.
//which means:we don‘t need to free the string returnd by inet_ntoa(), but it‘s non-re-entry,be careful in multithreads.
char *str = inet_ntoa(client_addr.sin_addr);
LOG(LOG_LEVEL_NODE, "[fd:" << conn_fd << "] connect from:" << str);

ev.data.fd = conn_fd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(ep_fd, EPOLL_CTL_ADD, conn_fd, &ev); //add the new connection to epoll
}
else if (events[i].events & EPOLLIN)
{
if ((sock_fd = events[i].data.fd) < 0)
{
LOG(LOG_LEVEL_WARN, "[fd:" << sock_fd << "] Invalid socket fd.");
continue;
}

LOG(LOG_LEVEL_DEBUG, "[fd:" << sock_fd << "] EPOLLIN. events:" << events[i].events);
m_TaskQueue->Push(sock_fd);
m_SocketManager->UpdateActiveTime(sock_fd);
}
}
}

LOG(LOG_LEVEL_NODE, "Listener::Run() exited.");
}
};
#endif /* LISTENER_H */

 

 

 

 

 

 

 

 

 

 

 

/*
* File: debug.h
* Author: raozf
*
* Created on 2013年3月27日, 下午6:12
*/

#ifndef DEBUG_H
#define DEBUG_H

#include "common.h"

bool pre_exit (pid_t pid, int status)
{
bool _bNeedRestart = false;
if (WIFEXITED(status))//pid terminated by exit
{
LOG(LOG_LEVEL_NODE, "[Demaon] Child process normal termination, exit status = " <<WEXITSTATUS(status));
}
else if (WIFSIGNALED(status))//pid terminated by signal
{
LOG(LOG_LEVEL_NODE, "[Demaon] Child process abnormal termination, killed by signal number = " << WTERMSIG(status)<<
#ifdef WCOREDUMP
(WCOREDUMP(status) ? "(core file generated)" : "(No core file generated)"));
#else
"WCOREDUMP not defined.");
#endif

//need restart?
//if(WTERMSIG(status) != SIGUSR1)//not killed by me,then restart it.
{
_bNeedRestart = true;
}
}
else if (WIFSTOPPED(status))
{
LOG(LOG_LEVEL_NODE, "[Demaon] Child process stopped, signal number = " <<WSTOPSIG(status));
}

return _bNeedRestart;
}

void dump_debug(int signo)
{
void *array[30] = {0};
size_t size;
char **strings = NULL;
size_t i;

std::string strFile = "signal.log";
FILE* pf = fopen(strFile.c_str(), "w");//每次清空,防止程序不断重启导致signal.log文件过大

size = backtrace (array, 30);
strings = backtrace_symbols (array, size);
if(strings != NULL)
{
fprintf (stderr,"[%s] get SIGSEGV[%d] signel.\n", NOW_STR, signo);
fprintf (stderr,"[%s] Obtained %zd stack frames.\n", NOW_STR, size);
if(pf != NULL)
{
fprintf (pf, "[%s] get SIGSEGV[%d] signel.\n", NOW_STR, signo);
fprintf (pf, "[%s] Obtained %zd stack frames.\n", NOW_STR, size);
fflush(pf);
}

for (i = 0; i < size; i++)
{
fprintf (stderr,"[%s] %s\n", NOW_STR, strings[i]);
fflush(stderr);

if(pf != NULL)
{
fprintf (pf,"[%s] %s\n", NOW_STR, strings[i]);
fflush(pf);
}
}
}

if(pf != NULL)
{
fclose(pf);
pf = NULL;
}
free (strings);

exit(0);
}

void Debug_Printf_FrameInfos()
{
signal(SIGSEGV, dump_debug);
signal(SIGABRT, dump_debug);
}


#endif /* DEBUG_H */

 

 

 

 

 

 

 

 

 

 

 

 

 

#include "common.h"
#include "Serve.h"
#include "test.h"
#include "debug.h"

void CheckConfigure()
{
try
{
boost::property_tree::ptree _pt;
boost::property_tree::ini_parser::read_ini(LOG_CONFIG, _pt);

try
{
std::string _strLevel = _pt.get<std::string>("loglevel");
LogLevel _level = LOG_LEVEL_NONE;
for(int i=0; i<sizeof(g_strLogLevel)/sizeof(std::string); i++)
{
if(strcasecmp(_strLevel.c_str(), g_strLogLevel[i]) == 0)
{
_level = (LogLevel) i;
break;
}
}

if(_level != g_logLevel)
{
g_logLevel = _level;
LOG(LOG_LEVEL_NODE, "Log level set to:"<<g_strLogLevel[_level]);
}
}
catch(boost::property_tree::ptree_error _error)
{
LOG(LOG_LEVEL_DEBUG, _error.what());
}

try
{
std::string _strLogRaw = _pt.get<std::string>("lograw");
if(strcasecmp(_strLogRaw.c_str(), "yes") == 0)
{
if(g_logRaw == false)
{
LOG(LOG_LEVEL_NODE, "LogRaw set to: Yes");
g_logRaw = true;
}
}
else if(strcasecmp(_strLogRaw.c_str(), "no") == 0)
{
if(g_logRaw == true)
{
LOG(LOG_LEVEL_NODE, "LogRaw set to: No");
g_logRaw = false;
}
}
}
catch(boost::property_tree::ptree_error _error)
{
LOG(LOG_LEVEL_DEBUG, _error.what());
}
}
catch(boost::property_tree::ini_parser_error _error)
{
LOG(LOG_LEVEL_DEBUG, _error.what());
}
}

bool CheckExit()
{
try
{
boost::property_tree::ptree _pt;
boost::property_tree::ini_parser::read_ini(GRACE_EXIT, _pt);

try
{
std::string _strExit = _pt.get<std::string>("exit");
if(strcasecmp(_strExit.c_str(), "now") == 0)
{
LOG(LOG_LEVEL_NODE, "********************* Detect Grace Exit! ****************************");
LOG(LOG_LEVEL_NODE, "");
LOG(LOG_LEVEL_NODE, "program will exit now.");
LOG(LOG_LEVEL_NODE, "");
LOG(LOG_LEVEL_NODE, "*********************************************************************");
return true;
}
}
catch(boost::property_tree::ptree_error _error)
{
LOG(LOG_LEVEL_DEBUG, _error.what());
}
}
catch(boost::property_tree::ini_parser_error _error)
{
LOG(LOG_LEVEL_DEBUG, _error.what());
}

return false;
}

void Verbose()
{
LOG(LOG_LEVEL_NODE, "*********************************************************************");
LOG(LOG_LEVEL_NODE, " Aimu Server.");
LOG(LOG_LEVEL_NODE, "Version: 0.99");
LOG(LOG_LEVEL_NODE, "Build: "<<__DATE__<<", "<<__TIME__);
LOG(LOG_LEVEL_NODE, "Gcc: "<<__VERSION__);
LOG(LOG_LEVEL_NODE, "Author: Zhengfeng Rao, gisrzf@gmail.com");
LOG(LOG_LEVEL_NODE, "");
LOG(LOG_LEVEL_NODE, "Copyright ©2013, All rights reserved.");
LOG(LOG_LEVEL_NODE, "*********************************************************************");
}

void DoParams(int& server_port, int argc, char** argv)
{
if (argc >= 2)
{
int port = atoi(argv[1]);
if (port > 0 && port < 65535)
{
server_port = port;
}
else
{
LOG(LOG_LEVEL_ERROR, "Invalid port:" << argv[1]);
}
}
LOG(LOG_LEVEL_INFO, "Use port: " << server_port);
}

int mainFun(int argc, char** argv)
{
Debug_Printf_FrameInfos();
LOG(LOG_LEVEL_NODE, "Server starting...");
Verbose();

int server_port = SERVER_PORT;
DoParams(server_port, argc, argv);

Server* _pServer = Server::NewServer(server_port);
if(_pServer != NULL)
{
_pServer->Run();
}

while (1)
{
sleep(3);
CheckConfigure();
if (CheckExit() == true)//need to exit.
{
Server::DestoryServer(_pServer);

LOG(LOG_LEVEL_NODE, "All threads exited!");
exit(EXIT_SUCCESS);
}
}
}


static int g_nPID = 0;
int main(int argc, char** argv)
{
LOG(LOG_LEVEL_NODE, "[Demaon] App start!");
if((g_nPID = fork()) == 0)//executed by child process
{
LOG(LOG_LEVEL_NODE, "Child process forked!");
mainFun(argc, argv);
}
else//executed by parent process
{
int nStatus = 0;
while(1)
{
int n = waitpid(g_nPID, &nStatus, 0);//will block
LOG(LOG_LEVEL_NODE, "[Demaon] Child process exited!");
if(n <= 0)
{
//continue;
}
else
{
if(pre_exit(g_nPID, nStatus) == true)
{
LOG(LOG_LEVEL_NODE, "[Demaon] Need restart.");
sleep(3);
if ((g_nPID = fork()) == 0)
{
LOG(LOG_LEVEL_NODE, "[Demaon] Program restarting...");
mainFun(argc, argv);
}
}
else
{
exit(EXIT_SUCCESS);
}
}
}
}
}

 

 

 

 

 

 

 

 

 

 

 

 

/*
* File: test.h
* Author: chu
*
* Created on 2013年3月22日, 下午9:49
*/

#ifndef TEST_H
#define TEST_H

#include "common.h"
#include "AesWarpper.h"

void HexDump(const char* str, int len)
{
printf("Len:%d\n", len);
for (int i = 0; i < len; i++)
{
printf("%x%x ", (str[i] >> 4)& 0xf, str[i]&0xf);
}
printf("\n");
}

int test_aes()
{
AesWarpper _warpper;
std::string _plain = "{\
\"msg\":0,\
\"uid\":\"13286790089_MX13435455\",\
\"data\":\
[\
{\"num\":\"18709671222\", \"name\":\"oooo\",\"status\":2},\
{\"num\":\"18709671233\", \"name\":\"ooXX\",\"status\":2},\
{\"num\":\"18709671244\", \"name\":\"oXXX\",\"status\":2}\
]\
}";

int _len = _plain.length();
LOG(LOG_LEVEL_DEBUG, "Original:");
HexDump(_plain.c_str(), _len);

unsigned char _en[SOCK_RECV_BUF] = {0};
_warpper.Encrypt(_plain.c_str(), _en, _len);
LOG(LOG_LEVEL_DEBUG, "Encrypted:");
HexDump((const char*) _en, _len);

unsigned char _de[SOCK_RECV_BUF] = {0};
if (_warpper.Decrypt((const unsigned char*)_en, (unsigned char*)_de, _len) == true)
{
LOG(LOG_LEVEL_DEBUG, "Decrypted:");
HexDump((const char*) _de, _len);
}
else
{
LOG(LOG_LEVEL_ERROR, "Decrypt Failed!");
}

return 0;
}

#endif /* TEST_H */

 

 

 

 

 

 

 

 

 

 

/*
* File: WorkThread.h
* Author: chu
*
* Created on 2013年3月9日, 下午4:56
*/

#ifndef WORKTHREAD_H
#define WORKTHREAD_H

#include "common.h"
#include "TaskQueue.h"
#include "Contacts.h"
#include "Message.h"
#include "Runable.h"
#include "AesWarpper.h"

class Worker : public Runable
{
private:
TaskQueue* m_TaskQueue;
char m_buf[SOCK_RECV_BUF];

AesWarpper m_decryptor;
char m_decrypted_buf[AES_MAX_BUFFER_LEN];
ContactsManager* m_ContactsManager;
public:

Worker(TaskQueue* task_queue) : Runable(), m_TaskQueue(task_queue)
{
memset(m_buf, 0, SOCK_RECV_BUF);
memset(m_decrypted_buf, 0, AES_MAX_BUFFER_LEN);
m_ContactsManager = ContactsManager::GetInstance();
}

void Run()
{
LOG(LOG_LEVEL_NODE, "Worker::Run().");

int _fd = -1;
while (!m_bExit)
{
//_fd = m_TaskQueue->Pop(); //will block if no task
if((_fd = m_TaskQueue->Pop_Timed()) == -1)
{
continue;
}
LOG(LOG_LEVEL_DEBUG, "[fd: " << _fd << "] start to read.");

//recv data.
int _ret = -1;
int _read_bytes = 0;
int _decrypted_len = 0;
std::string _str;
size_t _sent;

char* _p = m_buf;
memset(m_buf, 0, SOCK_RECV_BUF);
memset(m_decrypted_buf, 0, AES_MAX_BUFFER_LEN);

while (true)
{
_ret = recv(_fd, _p, SOCK_RECV_BUF - _read_bytes, 0);
if (_ret < 0)
{
if (errno == EAGAIN)//no data to read on socket when nonblock. read finished.
{
break;
}
else
{
LOG(LOG_LEVEL_ERROR, "[fd:" << _fd << "] recv() error, returned :" << _ret << ". error: " << strerror(errno));
goto close_socket;
}
}
else if (_ret == 0)//peer point socket is closed
{
LOG(LOG_LEVEL_NODE, "[fd:" << _fd << "] peer closed.");
goto close_socket;
}
else
{
_read_bytes += _ret;
_p = m_buf + _ret;
if (_read_bytes >= SOCK_RECV_BUF)
{
LOG(LOG_LEVEL_WARN, "[fd:" << _fd << "] read " << _read_bytes << " bytes, reach max buffer length. discard.");
goto close_socket;
}
}
}
if (_read_bytes > 0)//have read something
{
LOG(LOG_LEVEL_DEBUG, "[fd:" << _fd << "] read " << _read_bytes << " bytes from fd:" << _fd << ". MSG:" << m_buf);
}

//*********************************************************************************************
//Below is bussiness code.
//*********************************************************************************************
//decode http header

//decrypt msg
_decrypted_len = _str.length();
if(m_decryptor.Decrypt((const unsigned char*)m_buf, (unsigned char*)m_decrypted_buf, _decrypted_len) == false)
{
LOG(LOG_LEVEL_ERROR, "[fd:" << _fd << "] decrypt failed.");
goto close_socket;
}

//process msg
//if (m_ContactsManager->Process(m_decrypted_buf, _str) == true)
if (m_ContactsManager->Process(m_buf, _str) == true)
{
//LOG_DEBUG("rep:" << _str);
_sent = send(_fd, _str.c_str(), _str.length(), 0);
if (_sent < _str.length())
{
LOG(LOG_LEVEL_WARN, "[fd:" << _fd << "] send response failed, error:" << strerror(errno) << ". response:" << _str << ". _sent:" << _sent << ", size:" << _str.length());
}
else
{
LOG(LOG_LEVEL_INFO, "[fd:" << _fd << "] send ok.");
}
}
//*********************************************************************************************
//Bussiness code finished.
//*********************************************************************************************

//continue;
close_socket:
close(_fd);
LOG(LOG_LEVEL_NODE, "[fd:" << _fd << "] socket closed.");
}

LOG(LOG_LEVEL_NODE, "Worker::Run() exited!");
}
};
typedef boost::shared_ptr<Worker> WorkerPtr;
typedef boost::shared_ptr<boost::thread> ThreadPtr;

class ThreadPool
{
private:

typedef struct _thread
{

_thread(WorkerPtr handle, ThreadPtr object) :
m_handle(handle), m_object(object) { }

WorkerPtr m_handle;
ThreadPtr m_object;
} WorkThread;
std::vector<WorkThread> m_WorkThreads;
public:

ThreadPool(int thread_num, TaskQueue* task_queue)
{
for (int i = 0; i < thread_num; i++)
{
WorkerPtr _worker(new Worker(task_queue));
ThreadPtr _threadObj(new boost::thread(boost::bind(&Worker::Run, _worker.get())));
WorkThread _thread(_worker, _threadObj);
m_WorkThreads.push_back(_thread);
}
}

~ThreadPool() { }

void StopAll()
{
LOG(LOG_LEVEL_NODE, "ThreadPool::StopAll()");

BOOST_FOREACH(std::vector<WorkThread>::value_type& i, m_WorkThreads)
{
i.m_handle->Stop();
}

BOOST_FOREACH(std::vector<WorkThread>::value_type& i, m_WorkThreads)
{
i.m_object->join();
}
LOG(LOG_LEVEL_NODE, "ThreadPool::StopAll() ok.");
}
};
typedef boost::shared_ptr<ThreadPool> ThreadPoolPtr;
#endif /* WORKTHREAD_H */

 

 

 

 

 

 

 

 

 

 

/*
* File: AceWarpper.h
* Author: raozf
*
* Created on 2013年3月21日, 下午2:37
*/

#ifndef AESWARPPER_H
#define AESWARPPER_H

#include <string>
#include "openssl-aes/include/aes.h"
#include "common.h"

class AesWarpper
{
private:
AES_KEY m_key;

private:
//CBC:pad with 0
//ECB:undefined, usually PCKS5Padding

void PadData(const char* str, int& len, unsigned char** out, int blocksize)
{
unsigned char _pad = (unsigned char) (blocksize - len % blocksize);
len += _pad;
*out = (unsigned char*) malloc(len * sizeof (char));
memset(*out, 0, len);
memcpy(*out, str, len * sizeof (char));
}

int RangeRandom(int min, int max)
{
if (max > min)
return random() % (max - min + 1) + min;
else
return random();
}

void GenerateKey(int key_index, unsigned char* key, unsigned int key_size)
{
for (int i = 0; i < key_size; i++)
{
//TODO: need more mix operations
key[i] = (i + 435)*20^43657;
}
}

void GenerateIV(int key_index, unsigned char* iv)
{
for (int i = 0; i < 16; i++)
{
iv[i] = (i + 793)^key_index << 1;
}
}

void EncodeHeader(int packet_len, unsigned char* buf)
{
int _t = RangeRandom(1, 19);
packet_len = (packet_len + _t * 100)*32 + _t * 100000000;
PUTU32(buf, packet_len);
}

bool DecodeHeader(int& len, const unsigned char* buf)
{
int _l = GETU32(buf);
len = (_l % 100000000) / 32 - ((_l / 100000000)*100);
return (len > 8)&&(len <= AES_MAX_BUFFER_LEN);
}

int MixKeyIndex(int key_index)
{
int _nMixedKeyIndex = key_index + 13455350;
return _nMixedKeyIndex;
}

int DeMixKeyIndex(int key_index)
{
int _nDeMixedKeyIndex = key_index - 13455350;
return _nDeMixedKeyIndex;
}
public:

AesWarpper()
{
srand(time(NULL));
}

~AesWarpper() { }

/*
* [plain_text]: plain text that to be encrypted
* [encrypted_buf]:encrypted buffer
* [len]: plain text‘s length, will be set to ‘length of the encrypted buffer‘
*/
void Encrypt(const char* plain_text, unsigned char* encrypted_buf, int& len)
{
unsigned char _key[16] = {0};
unsigned char _iv[16] = {0};
int _keyIndex = RangeRandom(0, RAND_MAX);
int _originalLen = len;
GenerateKey(_keyIndex, _key, 16);
GenerateIV(_keyIndex, _iv);
AES_set_encrypt_key(_key, 128, &m_key);

unsigned char* _pSrcPadded = NULL;
PadData(plain_text, len, &_pSrcPadded, AES_BLOCK_SIZE);
len = 4 + 4 + len;
//4bytes(perplex bytes) + 4bytes(mixed keyindex) + len(_encrypted_bytes)
EncodeHeader(_originalLen, encrypted_buf);
PUTU32(encrypted_buf + 4, MixKeyIndex(_keyIndex));
AES_cbc_encrypt(_pSrcPadded, encrypted_buf + 4 + 4, len - 4 - 4, &m_key, _iv, AES_ENCRYPT);
free(_pSrcPadded);
}

/*
* [encrypted_buf]: encrypted buffer that to be decrypted
* [decrypted_buf]: pointer to decrypted buffer.
* [len]: encrypted buffer‘s length, will be set to ‘length of the decrypted buffer‘ if decrypt success.
* [return]: ture if decrypt success, or false;
*/
bool Decrypt(const unsigned char* encrypted_buf, unsigned char* decrypted_buf, int& len)
{
int _decrypted_len = 0;
if ((len > SOCK_RECV_BUF)
|| (DecodeHeader(_decrypted_len, encrypted_buf) == false))
{
LOG(LOG_LEVEL_ERROR,"AesWarpper::Decrypt(): DecodeHeader() failed. _decrypted_len :"<< _decrypted_len <<" len:" << len);
return false;
}

unsigned char _key[16] = {0};
unsigned char _iv[16] = {0};
int _keyIndex = DeMixKeyIndex(GETU32(encrypted_buf + 4));
GenerateKey(_keyIndex, _key, 16);
GenerateIV(_keyIndex, _iv);
AES_set_decrypt_key(_key, 128, &m_key);
AES_cbc_encrypt(encrypted_buf + 8, decrypted_buf, len - 8, &m_key, _iv, AES_DECRYPT);
len = _decrypted_len;
return true;
}
};

#endif /* AESWARPPER_H */

 

 

 

 

 

 

 

 

 

 

 

/*
* File: common.h
* Author: chu
*
* Created on 2013年3月9日, 下午6:36
*/

#ifndef COMMON_H
#define COMMON_H

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <iostream>
#include <fstream>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <queue>
#include <limits>
#include <sys/wait.h>
#include <sys/types.h>
#include <execinfo.h>

#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/date_time.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <boost/unordered/unordered_map.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>

#define SERVER_PORT 7932 //default listen port
#define LISTENQ 20
#define MAX_EVENTS 20
#define WORK_THREADS 10 //number of work threads
#define SOCK_RECV_BUF 1024*1024 //connections recv buf size(about 60bytes per contact item)
#define SOCK_INACTIVE_TIMEOUT 20 //close inactive connections timeout
#define RELOAD_CFG_INTERVAL 3 //reload configure file interval time

#define LOG_CONFIG "config.log"
#define GRACE_EXIT "grace.exit"

enum LogLevel
{
LOG_LEVEL_DEBUG = 0, //debug information: running details
LOG_LEVEL_INFO, //normal information:
LOG_LEVEL_WARN, //warning information:bussiness works incorrectly, but can continue to run,might got unexcepted result
LOG_LEVEL_ERROR, //error information: bussiness error, but won‘t affect other bussiness
LOG_LEVEL_NODE, //node information: program runing to keypoint
LOG_LEVEL_FATAL, //fatal information: bussiness error, and program can‘t run anymore, must exit
LOG_LEVEL_NONE, //don‘t log anything.
};

const char g_strLogLevel[][6] = {"DEBUG", "INFO", "WARN", "ERROR", "NODE", "FATAL", "NONE"};

LogLevel g_logLevel = LOG_LEVEL_DEBUG;
bool g_logRaw = false;

#define NOW boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())
#define NOW_STR NOW.c_str()
#define THREAD_ID boost::this_thread::get_id()
#define LEVEL_STRING(level)

#define LOG(level, msg) \
if(level >= g_logLevel)\
{\
if(!g_logRaw)\
{\
std::cout<<"["<<NOW<<"]["<<g_strLogLevel[level]<<"][Thread "<<THREAD_ID<<"]"<<msg<<std::endl;\
}\
else\
{\
std::cout<<msg<<std::endl;\
}\
}

#define LOCK boost::mutex::scoped_lock

//json mseeage data filed name.
#define JSON_MSG_TYPE "msg"
#define JSON_UID "uid"
#define JSON_DATA "data"
#define JSON_CONTACT_NUM "num"
#define JSON_CONTACT_NAME "name"
#define JSON_CONTACT_STATUS "status"

#define ROOT_DIR "./data" //data file‘s base directory
#define KV_CONTACT_MAP "ContactsMap" //kv file name

#define JSON_PARSE(reader, str, value, expectedType) if ((reader.parse(str, value) == false))\
{\
LOG(LOG_LEVEL_ERROR, "parse json failed." << reader.getFormatedErrorMessages() << " str:" << str);\
return false;\
}\
if (value.isNull() == true)\
{\
LOG(LOG_LEVEL_ERROR, "value is NULL. str:" << str);\
return false;\
}\
if(value.type() !=expectedType)\
{\
LOG(LOG_LEVEL_ERROR, "expected type:" << expectedType<<", json type:"<<value.type());\
return false;\
}

//aes crypto
typedef unsigned int u32;
typedef unsigned char u8;
#define GETU32(pt) (((u32)(pt)[0] << 24) ^ ((u32)(pt)[1] << 16) ^ ((u32)(pt)[2] << 8) ^ ((u32)(pt)[3]))
#define PUTU32(ct, st) { (ct)[0] = (u8)((st) >> 24); (ct)[1] = (u8)((st) >> 16); (ct)[2] = (u8)((st) >> 8); (ct)[3] = (u8)(st); }
#define AES_MAX_BUFFER_LEN SOCK_RECV_BUF-8

#endif /* COMMON_H */

 

 

 

 

 

 

 

 

 

 

 

/*
* File: Contacts.h
* Author: raozf
*
* Created on 2013年3月13日, 下午1:12
*/

#ifndef CONTACTS_H
#define CONTACTS_H

//boost jsonparser....
#include "jsoncpp/json.h"
#include "leveldb/include/leveldb/db.h"

#include "common.h"
#include "Message.h"

class ContactItem
{
private:
std::string m_strPhoneNum;
std::string m_strName;
short m_nStatus;
public:

ContactItem(std::string phoneNum, std::string name, short status) :
m_strPhoneNum(phoneNum),
m_strName(name),
m_nStatus(status) { }

ContactItem() { }

bool IsValid()
{
if ((m_strPhoneNum == "") || (m_nStatus < 0) || (m_nStatus > 2))
{
return false;
}

return true;
}

void SetStatus(short status)
{
if (status >= 0 && status <= 2)
{
m_nStatus = status;
}
}

std::string toJsonString()
{
std::string _str;
if (IsValid())
{
_str = "{\"";
_str += JSON_CONTACT_NUM;
_str += "\":\"";
_str += m_strPhoneNum;
_str += "\",\"";
_str += JSON_CONTACT_NAME;
_str += "\":\"";
_str += m_strName;
_str += "\",\"";
_str += JSON_CONTACT_STATUS;
_str += "\":";
_str += boost::lexical_cast<std::string>(m_nStatus);
_str += "}";
}

return _str;
}
};

class Contacts
{
private:
std::string m_strFile;
std::string m_uid;
boost::unordered_map<std::string, ContactItem> m_contacts;

public:

Contacts() { }

~Contacts()
{
Serilize();
}

bool Load(const std::string& file)
{
LOG(LOG_LEVEL_INFO, "Loading file:‘" << file << "‘");
m_strFile = file;

std::ifstream _in;
try
{
_in.open(m_strFile.c_str());

std::string _str = "";
std::string _tmp;
while (getline(_in, _tmp))
{
_str += _tmp;
}
_in.close();
if (_str == "")
{
LOG(LOG_LEVEL_INFO, "empty file.");
return true;
}

Json::Value _root;
Json::Reader _reader;
JSON_PARSE(_reader, _str, _root, Json::arrayValue);
return Init(_root);
}
catch (std::fstream::failure e)
{
_in.close();
LOG(LOG_LEVEL_ERROR, "load file: ‘" << m_strFile << "‘ failed." << e.what());
return false;
}
}

void Serilize()
{
std::string _str;
toJsonString(_str);
std::ofstream _out;

try
{
_out.open(m_strFile.c_str());
_out << _str;
_out.close();
}
catch (std::fstream::failure e)
{
_out.close();
LOG(LOG_LEVEL_ERROR, "Serilize() to file: ‘" << m_strFile << "‘ failed." << e.what());
}
}

void toJsonString(std::string& str)
{
str = "[";
for (boost::unordered_map<std::string, ContactItem>::iterator it = m_contacts.begin(); it != m_contacts.end(); it++)
{
str += it->second.toJsonString();
str += ",";
}
size_t _pos = str.find_last_of(",");
if (_pos != std::string::npos)
{
str.erase(_pos);
}
str += "]";
}

void Add(const std::string& num, const std::string& name, int status)
{
if (num != "")
{
m_contacts[num] = ContactItem(num, name, status);
}
}

void Del(const std::string& num)
{
if (num != "")
{
m_contacts.erase(num);
}
}

private:

bool Init(const Json::Value& jsonObj)
{
m_contacts.clear();

int _count = 0;
for (int i = 0; i < jsonObj.size(); i++)
{
Json::Value _tmp = jsonObj[i];
if (_tmp.isObject() == false
|| _tmp.isNull() == true
|| _tmp[JSON_CONTACT_NUM].isString() == false
|| _tmp[JSON_CONTACT_NAME].isString() == false
|| _tmp[JSON_CONTACT_STATUS].isInt() == false)
{
LOG(LOG_LEVEL_WARN,"Invalid json:" << _tmp.toStyledString());
continue;
}

ContactItem _item(_tmp[JSON_CONTACT_NUM].asString(), _tmp[JSON_CONTACT_NAME].asString(), _tmp[JSON_CONTACT_STATUS].asInt());
if (!_item.IsValid())
{
LOG(LOG_LEVEL_WARN,"Invalid item:" << _tmp.toStyledString());
continue;
}

LOG(LOG_LEVEL_DEBUG,"[item]" << _item.toJsonString());
m_contacts[_tmp[JSON_CONTACT_NUM].asString()] = _item;
_count++;
}
LOG(LOG_LEVEL_INFO,"loaded " << _count << " contact items.");

return true;
}
};

class ContactsManager
{
public:
leveldb::DB* m_pContactMap;
private:

ContactsManager()
{
OpenKV();
}

~ContactsManager() { }

void OpenKV()
{
LOG(LOG_LEVEL_NODE,"Opening KV file ‘" << KV_CONTACT_MAP << "‘ ...");

leveldb::Options _opt;
_opt.create_if_missing = true;
leveldb::Status _status = leveldb::DB::Open(_opt, KV_CONTACT_MAP, &m_pContactMap);
if ((_status.ok() == false) || (m_pContactMap == NULL))
{
LOG(LOG_LEVEL_FATAL,"leveldb::DB::Open() failed. " << _status.ToString());
}
}

bool OpenContacts(const std::string& uid, Contacts** pContacts)
{
if (uid == "")
{
LOG(LOG_LEVEL_ERROR,"OpenContacts()::uid is NULL.");
return false;
}

std::string _file_path;
leveldb::Status _status;
_status = m_pContactMap->Get(leveldb::ReadOptions(), uid, &_file_path);
if (_status.ok() == false)
{
if (_status.IsNotFound())
{
std::string _date = boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day());
_file_path = ROOT_DIR;
_file_path += "/";
_file_path += _date;

std::string _cmd = "mkdir -p ‘" + _file_path + "‘";
system(_cmd.c_str());

_file_path += "/";
_file_path += uid;

_status = m_pContactMap->Put(leveldb::WriteOptions(), uid, _file_path);
if (_status.ok() == false)
{
LOG(LOG_LEVEL_ERROR,"OpenContacts(): Put() method failed. uid:" << uid << ", error:" << _status.ToString());
return false;
}
}
else
{
LOG(LOG_LEVEL_ERROR,"OpenContacts(): Get() method failed. uid:" << uid << ", error:" << _status.ToString());
return false;
}
}

*pContacts = new Contacts;
return (*pContacts)->Load(_file_path);
}
public:

static ContactsManager* GetInstance()
{
static ContactsManager Instance;
return &Instance;
}

/*
json:
{
"msg":0,
"uid":"13286790089_MX13435455",
"data":
[
{"num":"18709671222", "name":"oooo","status":2},
{"num":"18709671233", "name":"ooXX","status":2},
{"num":"18709671244", "name":"oXXX","status":2}
]
}
*/
bool Process(std::string str, std::string& ret)
{
try
{
Json::Value _root;
Json::Reader _reader;
JSON_PARSE(_reader, str, _root, Json::objectValue);

Contacts* pContacts = NULL;
std::string _uid = _root[JSON_UID].asString();
if (OpenContacts(_uid, &pContacts) == false)
{
LOG(LOG_LEVEL_ERROR,"OpenContacts() failed. uid:" << _uid);
return false;
}

int _type = _root[JSON_MSG_TYPE].asInt(); //msg type;
if (_type == MSG_GET)
{
pContacts->toJsonString(ret);
}
else if (_type == MSG_SET)
{
Json::Value _data = _root[JSON_DATA];
Json::Value _tmp;
for (int i = 0; i < _data.size(); i++)
{
_tmp = _data[i];
pContacts->Add(_tmp[JSON_CONTACT_NUM].asString(),
_tmp[JSON_CONTACT_NAME].asString(),
_tmp[JSON_CONTACT_STATUS].asInt());
}

ret = "OK";
}
else if (_type == MSG_DEL)
{
Json::Value _data = _root[JSON_DATA];
Json::Value _tmp;
for (int i = 0; i < _data.size(); i++)
{
_tmp = _data[i];
pContacts->Del(_tmp[JSON_CONTACT_NUM].asString());
}
ret = "OK";
}
else
{
LOG(LOG_LEVEL_WARN,"Invalid msg type:" << _type);
}

delete pContacts;
return true;
}
catch (std::exception e)
{
LOG(LOG_LEVEL_ERROR,"Process() caught exception. error:" << e.what() << ". str:" << str);
return false;
}
}
};

#endif /* CONTACTS_H */

 

 

 

 

 

 

 

 

 

 

/*
* File: Message.h
* Author: raozf
*
* Created on 2013年3月13日, 下午3:07
*/

#ifndef MESSAGE_H
#define MESSAGE_H

enum MSG_TYPE
{
MSG_GET = 0,
MSG_SET,
MSG_DEL
};

#endif MESSAGE_H

 

 

 

 

 

 

 

 

 

/*
* File: Serve.h
* Author: raozf
*
* Created on 2013年3月27日, 下午6:28
*/

#ifndef SERVE_H
#define SERVE_H

#include "common.h"
#include "TaskQueue.h"
#include "SocketManager.h"
#include "Listener.h"
#include "WorkThread.h"

class Server
{
private:
int m_nServerPort;
TaskQueue m_taskQueue;
SocketManager m_manager;
Listener* m_pListener;
ThreadPoolPtr m_pWorkers;
ThreadPtr m_listenThreadObj;
ThreadPtr m_checkThreadObj;

private:
Server(int server_port):
m_nServerPort(server_port), m_pListener(NULL)
{
}

~Server(){}

public:
static Server* NewServer(int server_port)
{
Server* _p = new Server(server_port);
return _p;
}

static void DestoryServer(Server* pServer)
{
LOG(LOG_LEVEL_NODE, "Server::DestoryServer() pServer = "<< pServer);
if(pServer != NULL)
{
pServer->Stop();
delete pServer;
pServer = NULL;

LOG(LOG_LEVEL_NODE, "Server destoryed.");
}
}

void Run()
{
m_pWorkers = ThreadPoolPtr(new ThreadPool(WORK_THREADS, &m_taskQueue));
m_pListener = Listener::GetInstance();
if (m_pListener->Init(m_nServerPort, &m_taskQueue, &m_manager) == false)
{
LOG(LOG_LEVEL_FATAL, "Listener Init() failed. programm exit!");
exit(EXIT_FAILURE);
}
m_listenThreadObj = ThreadPtr(new boost::thread(&Listener::Run, m_pListener));
m_checkThreadObj = ThreadPtr(new boost::thread(&SocketManager::Run, &m_manager));
}

void Stop()
{
LOG(LOG_LEVEL_NODE, "Server stopping...");
m_pListener->Stop();
m_listenThreadObj->join();

m_pWorkers->StopAll();
m_manager.Stop();
m_listenThreadObj->join();
LOG(LOG_LEVEL_NODE, "Server stopped.");
}
};

#endif /* SERVE_H */

 

thread

标签:

原文地址:http://www.cnblogs.com/chutianyao/p/2960173.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!