码迷,mamicode.com
首页 > 其他好文 > 详细

一个简单的高并发的回应服务器(5万并发)

时间:2015-01-07 13:10:50      阅读:375      评论:0      收藏:0      [点我收藏+]

标签:服务器   并发   线程池   

一个简单的高并发的回应服务器,主要是使用boost的库! 自己测试过可以达到5万个并发!

程序的下载地址:http://download.csdn.net/detail/guanyijun123/8335907


#include <stdio.h>
//#include "AuthenHandle.h"
//#include "configure.h"
//#include "NetSocketCommand.h"

#ifdef WIN32 //for windows nt/2000/xp

 
//#include "gelsserver.h"
#pragma comment(lib,"Ws2_32.lib")
#else         //for unix


#include <sys/socket.h>
//    #include <sys/types.h>

//    #include <sys/signal.h>

//    #include <sys/time.h>

#include <netinet/in.h>     //socket

//    #include <netdb.h>

#include <unistd.h>            //gethostname

// #include <fcntl.h>

#include <arpa/inet.h>

#include <string.h>            //memset

typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef struct sockaddr SOCKADDR;
#ifdef M_I386
typedef int socklen_t;
#endif

#define BOOL             int
#define INVALID_SOCKET    -1
#define SOCKET_ERROR     -1
#define TRUE             1
#define FALSE             0
#endif        //end #ifdef WIN32

static int count111 = 0;
static time_t oldtime = 0, nowtime = 0;

#include <list>
#include <cstdlib>
#include <iostream>
#include <stdexcept>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>

const int server_port = 6768;            //服务器启动的端口;
const int server_thread_pool_num = 4;    //服务器启动线程池的线程数;

using namespace std;
using boost::asio::ip::tcp;
CRITICAL_SECTION  listLock;
char szBuff[256] = {0} ;

int   nConnectCount = 0 ;

map<int, int> g_mapThreadId;  //线程ID 映射;

bool InsertMapThreadId(int nThreadId)
{
	map<int, int>::iterator mapThreadIdIt = g_mapThreadId.find(nThreadId);
	if (mapThreadIdIt == g_mapThreadId.end())
	{
		//没有找到插入并返回true;
		g_mapThreadId.insert( std::make_pair(nThreadId, g_mapThreadId.size()+1) );
		return true;
	}
	else
	{
		//已经存在不插入返回false
		return false;
	}
}

class io_service_pool
	: public boost::noncopyable
{
public:

	explicit io_service_pool(std::size_t pool_size)
		: next_io_service_(0)
	{ 
		for (std::size_t i = 0; i < pool_size; ++ i)
		{
			io_service_sptr io_service(new boost::asio::io_service);
			work_sptr work(new boost::asio::io_service::work(*io_service));
			io_services_.push_back(io_service);
			work_.push_back(work);
		}
	}

	void start()
	{ 
		for (std::size_t i = 0; i < io_services_.size(); ++ i)
		{
			boost::shared_ptr<boost::thread> thread(new boost::thread(
				boost::bind(&boost::asio::io_service::run, io_services_[i])));
			threads_.push_back(thread);
		}
	}

	void join()
	{
		for (std::size_t i = 0; i < threads_.size(); ++ i)
		{
			threads_[i]->join();
		} 
	}

	void stop()
	{ 
		for (std::size_t i = 0; i < io_services_.size(); ++ i)
		{
			io_services_[i]->stop();
		}
	}

	boost::asio::io_service& get_io_service()
	{
		boost::mutex::scoped_lock lock(mtx);
		boost::asio::io_service& io_service = *io_services_[next_io_service_];
		++ next_io_service_;
		if (next_io_service_ == io_services_.size())
		{
			next_io_service_ = 0;
		}
		return io_service;
	}

private:
	typedef boost::shared_ptr<boost::asio::io_service> io_service_sptr;
	typedef boost::shared_ptr<boost::asio::io_service::work> work_sptr;
	typedef boost::shared_ptr<boost::thread> thread_sptr;

	boost::mutex mtx;

	std::vector<io_service_sptr> io_services_;
	std::vector<work_sptr> work_;
	std::vector<thread_sptr> threads_; 
	std::size_t next_io_service_;
};

boost::mutex cout_mtx;
int packet_size = 0;
enum {MAX_PACKET_LEN = 4096};

class session
{
public:
	session(boost::asio::io_service& io_service)
		: socket_(io_service)
		, recv_times(0)
	{

		bDeleteFlag = FALSE ;
		memset(data_,0x00,sizeof(data_));

	}

	virtual ~session()
	{
		boost::mutex::scoped_lock lock(cout_mtx);
		socket_.close() ;
		nConnectCount -- ;
	}

	tcp::socket& socket()
	{
		return socket_;
	}

	//暂时不需要这个函数
	inline void requestRead()
	{
		socket_.async_read_some(boost::asio::buffer(data_,MAX_PACKET_LEN ),//
			boost::bind(&session::handle_read, this,
			boost::asio::placeholders::error,
			boost::asio::placeholders::bytes_transferred));
	}
	

	void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
	{
		if (!error)
		{
			if(bytes_transferred > 0)
			{
			 sendData(data_,bytes_transferred);

			}
    
			requestRead() ;
		}
		else
		{
			 bDeleteFlag = TRUE;
			 //socket_.close() ;
			 nConnectCount -- ;
		}
	}

	BOOL sendData(char* szData,int nLength)
	{
		boost::asio::ip::tcp::endpoint  endpoint1 = socket_.remote_endpoint();
		int nThreadID = ::GetCurrentThreadId();
		InsertMapThreadId(nThreadID);
		printf("in    socket:%d  remoteip:%s threadId:%lld 0x:%x theadIdnum:%d ", socket_.remote_endpoint().port(), socket_.remote_endpoint().address().to_string().c_str() , nThreadID, nThreadID) ;
		printf("threadNum:%d \r\n", g_mapThreadId.size());
		
		if(bDeleteFlag || szData == NULL || nLength <= 0 )
			return FALSE ;

		boost::asio::async_write(socket_, boost::asio::buffer(szData, nLength),
			boost::bind(&session::handle_write, this, boost::asio::placeholders::error));

        return TRUE ;
	}

	void handle_write(const boost::system::error_code& error)
	{
		int nThreadID = ::GetCurrentThreadId();
		InsertMapThreadId(nThreadID);
		printf("write socket:%d  remoteip:%s threadId:%lld 0x:%x  ", socket_.remote_endpoint().port(), socket_.remote_endpoint().address().to_string().c_str() , nThreadID, nThreadID) ;
		printf("threadNum:%d \r\n", g_mapThreadId.size());
		if (!error)
		{//写入正确
			 
		}
		else
		{
			bDeleteFlag = TRUE;
			//socket_.close() ;
			nConnectCount -- ;
		}
	}

public:
	BOOL            bDeleteFlag ;

private:
	tcp::socket     socket_;
	char            data_[MAX_PACKET_LEN];
	int             recv_times;
};

typedef list<session* >  SessionList ;
SessionList              sessionList ;
class server
{
public:
	server(short port, int thread_cnt)
		: io_service_pool_(thread_cnt)
		, acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))
	{
		session* new_session = new session(io_service_pool_.get_io_service());
		acceptor_.async_accept(new_session->socket(),
			boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));


      EnterCriticalSection(&listLock);
		sessionList.push_back(new_session) ;
      LeaveCriticalSection(&listLock);   
	}

	void handle_accept(session* new_session, const boost::system::error_code& error)
	{
		if (!error)
		{
			//new_session->readRequest(Packet_Is_Head,sizeof(PacketHead)); //先请求包头
		   new_session->requestRead() ;
		   nConnectCount ++ ;
		}
		else
		{
            new_session->bDeleteFlag = TRUE ;
		}

 		new_session = new session(io_service_pool_.get_io_service());
		acceptor_.async_accept(new_session->socket(),
			boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
      EnterCriticalSection(&listLock);
	    sessionList.push_back(new_session) ;
      LeaveCriticalSection(&listLock);   

	  int nThreadID = ::GetCurrentThreadId();
	  printf("链接数量 %d  threadId:%lld 0x:%x \r\n",nConnectCount,  nThreadID, nThreadID) ;
	 
	}

	void run()
	{
		io_service_pool_.start();
		io_service_pool_.join();
	}

private:

	io_service_pool io_service_pool_;
	tcp::acceptor acceptor_;
};

int main()
{
 	//boost

	InitializeCriticalSection(&listLock) ;

	printf("server run! server port :%d thread_poo_num:%d \n", server_port, server_thread_pool_num);

  //创建线程数量,要先检测CPU线程数量,然后再创建相应的线程数
	server svr(server_port, server_thread_pool_num);
	svr.run();

	
	while(true)
	{
		Sleep(1000);
 
	}
   DeleteCriticalSection(&listLock);

   printf("server end\n ");


   return 0;
}


一个简单的高并发的回应服务器(5万并发)

标签:服务器   并发   线程池   

原文地址:http://blog.csdn.net/guanyijun123/article/details/42490377

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!