码迷,mamicode.com
首页 > 编程语言 > 详细

Linux C++ 实现时间轮 优化超时检测机制

时间:2016-07-24 07:06:29      阅读:520      评论:0      收藏:0      [点我收藏+]

标签:

参考资料:
http://www.ijilei.com/8357
https://www.zhihu.com/question/38427301
https://www.ibm.com/developerworks/cn/linux/l-cn-timers/
http://www.cnblogs.com/processakai/archive/2012/04/11/2442294.html

思路和代码的编写主要是参考的csdn上的一个java的代码
http://blog.csdn.net/mindfloating/article/details/8033340
我参考java代码的思路来编写的C++代码 ,说来其实也没啥技术难度,把代码分享下供后来的技术人使用

我使用c++ 实现的时间轮主要是用于检测超时,是tcp会话的超时与否
所以定义一个sessionKey作为唯一的主键key.

因为我使用了C++11中的unordered_map,所以需要重载operator == 操作符,并自己编写hash函数,也就是SessionHash中的函数

class Sessionkey{
public:
    Sessionkey(){srcIp=dstIp=srcPort=dstPort = 0;};
    Sessionkey(const Sessionkey& skey);
    Sessionkey(uint32_t src,uint32_t dst,uint16_t sp,uint16_t dp):srcIp(src),dstIp(dst),srcPort(sp),dstPort(dp){}

    bool operator == (const Sessionkey &skey) const;

public:
    uint32_t srcIp;
    uint32_t dstIp;
    uint16_t srcPort;
    uint16_t dstPort;
};

class SessionHash{
public:
    size_t operator()(const Sessionkey& sk) const;
};

include “sessionKey.h”

include “murmurHash.h”

Sessionkey::Sessionkey(const Sessionkey& sk)
{
srcIp = sk.srcIp;
dstIp = sk.dstIp;
srcPort = sk.srcPort;
dstPort = sk.dstPort;
}

bool Sessionkey::operator==(const Sessionkey& sk) const
{
return (srcIp == sk.srcIp) && (dstIp == sk.dstIp)
&& (srcPort == sk.srcPort) && (dstPort == sk.dstPort);
}

/通过Sessionkey的全部成员构造hash值,切勿随意在Sessionkey类中添加成员变量/
size_t SessionHash::operator()(const Sessionkey& sk) const
{
ull64_t ul64 = murmurHash64A(&sk, sizeof(sk), 0xee6b27eb);
return ul64;
}

哈希值生成算法采用的是
ull64_t murmurHash64A ( const void * key, int len, ull64_t seed ){
//const uint64_t m = BIG_CONSTANT(0xc6a4a7935bd1e995);
const ull64_t m = 0xc6a4a7935bd1e995;
const int r = 47;

ull64_t h = seed ^ (len * m);

const ull64_t * data = (const ull64_t *)key;
const ull64_t * end = data + (len/8);

while(data != end)
{
    ull64_t k = *data++;

    k *= m; 
    k ^= k >> r; 
    k *= m; 

    h ^= k;
    h *= m; 
}

const unsigned char * data2 = (const unsigned char*)data;

switch(len & 7)
{
case 7: h ^= ull64_t(data2[6]) << 48;
case 6: h ^= ull64_t(data2[5]) << 40;
case 5: h ^= ull64_t(data2[4]) << 32;
case 4: h ^= ull64_t(data2[3]) << 24;
case 3: h ^= ull64_t(data2[2]) << 16;
case 2: h ^= ull64_t(data2[1]) << 8;
case 1: h ^= ull64_t(data2[0]);
    h *= m;
};

h ^= h >> r;
h *= m;
h ^= h >> r;

return h;

}

#ifndef __TIME_WHEEL_H__
#define __TIME_WHEEL_H__

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#include <string>
#include <vector>
#include <unordered_map>

#include <mutex>

#include <sessionKey.h>
#include <glog/logging.h>
#include "lfds611.h"

using namespace std;

/****************************全局函数定义区**********************************/
#define TIMEOUT_SESSKEY_CNT  1000 //default
#define SESSKEY_BUFFER_CNT  2000

void *tickStepThreadGlobal(void* param);

typedef std::unordered_map<Sessionkey,int,SessionHash> tDurationMap;

/*定义槽类Slot*/
class Slot
{
public:
    Slot();
    Slot(int nId);
    ~Slot();
    void addElement(Sessionkey& key,int num);
    void removeElement(Sessionkey& key);

public:
    /*unordered_map的int是预留字段*/
    tDurationMap slotDurationMap;//sessionKey和int
    int id;
};

typedef std::unordered_map<Sessionkey,Slot*,SessionHash> tRelationMap;

/*时间轮类*/
class CTimeWheel
{
public:
    CTimeWheel();
    ~CTimeWheel();

    /*tickDuration:一个tick持续时间    ticksPerWheel 一轮的tick数(会话超时时间)       timeUnit:时间单位,如毫秒*/
    //构造函数中开启tick step线程
    CTimeWheel(int tickDuration,int ticksPerWheel,int timeUnit);
public:
    /*添加元素,返回新加入元素的timeout时间*/
    long addElement(Sessionkey& key,int num);
    bool removeElement(Sessionkey& key);

    /*开启线程步进tick数*/
    void tickStepRun();

private:
    void waitForNextTick();
    int getPreviousTickIndex();
    bool checkAdd(Sessionkey& key);
    void notifyExpired(int idx);//此处有疑问
    /*返回值以秒为单位*/
    long getCurrentTime();

public:
    /*读写加锁*/
    mutex mtx;

    /*时间轮,元素是Slot类型*/
    std::vector<Slot*> wheel;

    /*维护sessionKey和slot槽对应关系的哈希表*/
    tRelationMap keyToSlotMap;

    /*存储超时会话的SessionKey*/
    struct lfds611_queue_state *timeoutSessionQueue;

    Sessionkey  *sessKeyPool;

private:
    uint32_t tickDuration;
    uint32_t ticksPerWheel;
    uint32_t currentTickIndex;
    pthread_t tickThread;

    long startTime;
    long tick;
};


#endif /* __STW_TIMER_H__ */
#include "timeWheel.h"

void *tickStepThreadGlobal(void* param)
{
    CTimeWheel* pThis = (CTimeWheel*)param;
    pThis->tickStepRun();

    return NULL;
}

Slot::Slot()
{
    id = 0;
}

Slot::Slot(int nId)
{
    id = nId;
}

Slot::~Slot()
{

}

void Slot::addElement(Sessionkey& key,int num)
{
    if(0 == key.dstPort || 0 == key.srcPort)
    {
        VLOG(4)<<"addElement ERROR! 8888888888888888888888";
    }

    slotDurationMap.insert(make_pair(key,num));
}

void Slot::removeElement(Sessionkey& key)
{
    slotDurationMap.erase(key);
}

/************************CTimeWheel类实现**********************************/

CTimeWheel::CTimeWheel()
{
    tick = 0;
    currentTickIndex = 0;
}
CTimeWheel::~CTimeWheel()
{
    /*释放申请的slot资源*/
    for(uint32_t i=0;i<ticksPerWheel;i++)
    {
        Slot* delSlot = wheel.at(i);
        free(delSlot);
        delSlot = NULL;
    }
}

CTimeWheel::CTimeWheel(int tickDuration,int ticksPerWheel,int timeUnit)
{
    VLOG(3)<<"CTimeWheel Construct 2 Called!";

    currentTickIndex = 0;

    this->tickDuration = tickDuration*timeUnit;//单位:s
    this->ticksPerWheel = ticksPerWheel + 1;

    /*申请ticksPerWheel个slot槽*/
    for(uint32_t i=0;i<this->ticksPerWheel;i++)
    {
        Slot* tmp = new Slot(i);
        wheel.push_back(tmp);
    }

    sessKeyPool = new Sessionkey[SESSKEY_BUFFER_CNT];

    /*申请无锁队列存储超时sessionkey*/
    lfds611_queue_new(&timeoutSessionQueue,TIMEOUT_SESSKEY_CNT);

    /*开启线程,传递this指针*/
    if(pthread_create(&tickThread,NULL,tickStepThreadGlobal,this)!=0)
    {
        LOG(ERROR) <<"create tickStepThreadGlobal thread failed!";
    }
}

void CTimeWheel::tickStepRun()
{
    //获取当前时间
    startTime = getCurrentTime();

    //设置tick为1
    tick = 1;

    //1.获取当前tick指针的slot
    for(int i=0;;i++)
    {
        if(i == wheel.size())
        {
            i=0;
        }

        //加锁
        mtx.try_lock();

        currentTickIndex = i;

        //解锁
        mtx.unlock();

        //2.对当前slot所有元素进行timeout处理(重要,暂时没完成)
        notifyExpired(currentTickIndex);

        //3.等待下一次tick到来
        waitForNextTick();
    }
}

void CTimeWheel::waitForNextTick()
{
    while(1)
    {
        long currentTime = getCurrentTime();
        long sleepTime = tickDuration * tick - (currentTime - startTime);//单位

        /*这块的值可能过大,加调试信息*/
        //VLOG(3)<<"tick step Thread sleepTime is "<<sleepTime;

        if(sleepTime <= 0)
        {
            break;
        }
        else
        {
            sleep(sleepTime);
        }
    }

    //tick步进1
    VLOG(3)<<"tick step add 1";

    tick++;
}

long CTimeWheel::addElement(Sessionkey& key,int num)
{
    //1.检测时间轮是否添加相同元素,有则删除后重新将元素插入到wheel中
    if(false == checkAdd(key))
    {
        VLOG(3)<<"SessionKey is first Add";
    }

    //2.获取当前tick指针的前一个slot槽位
    int previousTickindex = getPreviousTickIndex();

    //3.添加元素到wheel->slot中
    Slot* pSlot = wheel.at(previousTickindex);
    if(NULL == pSlot)
    {
        return -1;
    }

    pSlot->addElement(key,num);
    VLOG(3)<<"threadID "<<num<<" addElement to TimeWheel!";

    //4.记录SessionKey和slot的对应关系
    keyToSlotMap.insert(make_pair(key,pSlot));
    VLOG(4)<<"keyToSlotMap size: "<<keyToSlotMap.size();

    VLOG(3)<<"threadID "<<num<<" insert sessionKey to keyToSlotMap!";

    //5.返回新加入元素的timeout时间
    return (ticksPerWheel - 1) * tickDuration;
}

bool CTimeWheel::removeElement(Sessionkey& key)
{
    Sessionkey reverseKey(key.dstIp,key.srcIp,key.dstPort,key.srcPort);

    //删除keyToSlotMap关系表key对应元素
    tRelationMap::iterator iteGot = keyToSlotMap.find(key);
    if(iteGot == keyToSlotMap.end())
    {
        iteGot = keyToSlotMap.find(reverseKey);
        if(iteGot == keyToSlotMap.end())
        {
            VLOG(3)<<"checkAdd function:sessionkey is not in keyToSlotMap";
            return false;
        }
        else
        {
            Slot* pSlot = iteGot->second;
            if(NULL == pSlot)
            {
                VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
                return false;
            }

            //4.删除wheel slot中的元素
            pSlot->removeElement(reverseKey);
            VLOG(3)<<"Erase key from wheel slot!";

            //5.删除keyToSlotMap关系表中元素,便于后续添加和更新
            keyToSlotMap.erase(reverseKey);
            VLOG(3)<<"Erase key from keyToSlotMap!";

            return true;
        }
    }
    else
    {
        Slot* pSlot = iteGot->second;
        if(NULL == pSlot)
        {
            VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
            return false;
        }

        //6.删除wheel slot中的元素
        pSlot->removeElement(key);
        VLOG(3)<<"Erase key from wheel slot!";

        //7.删除keyToSlotMap关系表中元素,便于后续添加和更新
        keyToSlotMap.erase(key);
        VLOG(3)<<"Erase key from keyToSlotMap!" << key.ToString();
    }

    /////////////测试代码///////////////////////
    struct in_addr addrSrc,addrDst;
    uint32_t srcIp = htonl(key.srcIp);
    uint32_t dstIp = htonl(key.dstIp);

    memcpy(&addrSrc,&srcIp,4);
    memcpy(&addrDst,&dstIp,4);

    string strSrcIP = inet_ntoa(addrSrc);
    string strDstIP = inet_ntoa(addrDst);

    //VLOG(4)<<strSrcIP<<"->"<<strDstIP;

    //VLOG(4)<<key.srcPort<<"->"<<key.dstPort;

    return true;
}

/*对当前tick索引对应slot中的所有元素做超时处理*/
void CTimeWheel::notifyExpired(int idx)
{
    //1.返回idx索引对应的slot槽
    if(idx<0 || idx >= ticksPerWheel)//0~ticksPerWheel-1
    {
        VLOG(4)<<"notifyExpired() function Failed!Reason:invalid Index!";
        return;
    }

    Slot*pSlot = wheel.at(idx);
    if(NULL == pSlot)
    {
        VLOG(4)<<"notifyExpired() function Failed!Reason:wheel slot is empty!";
        return;
    }

    //2.返回slot槽中元素集合
    //VLOG(4)<<"slot Element size is "<<pSlot->slotDurationMap.size();
    int index = 0;
    tDurationMap::iterator iteMap;

    for(iteMap=pSlot->slotDurationMap.begin();iteMap!=pSlot->slotDurationMap.end();iteMap++)
    {
        Sessionkey* timeoutKey = sessKeyPool+index;

        Sessionkey key = iteMap->first;
        timeoutKey->srcIp = key.srcIp;
        timeoutKey->dstIp = key.dstIp;
        timeoutKey->srcPort= key.srcPort;
        timeoutKey->dstPort= key.dstPort;

        //将超时的sessionkey压入Queue
        lfds611_queue_enqueue(timeoutSessionQueue,(void*)timeoutKey);

        struct in_addr addrSrc,addrDst;

        uint32_t srcIp = htonl(timeoutKey->srcIp);
        uint32_t dstIp = htonl(timeoutKey->dstIp);

        memcpy(&addrSrc,&srcIp,4);
        memcpy(&addrDst,&dstIp,4);

        string strSrcIP = inet_ntoa(addrSrc);
        string strDstIP = inet_ntoa(addrDst);

        VLOG(4)<<strSrcIP<<"->"<<strDstIP;

        VLOG(4)<<"srcPort:"<<timeoutKey->srcPort<<"dstPort:"<<timeoutKey->dstPort;

        VLOG(4)<<"sessionKey enqueue enqueue enqueue";

        //删除记录
        removeElement(key);

        if(++index  == SESSKEY_BUFFER_CNT)
        {
            index = 0;
        }   
    }
}

bool CTimeWheel::checkAdd(Sessionkey& key)
{
    //检测集合中是否存在,如存在则删除slot槽中元素,删除keyToSlotMap对应表中元素
    Sessionkey reverseKey(key.dstIp,key.srcIp,key.dstPort,key.srcPort);

    //删除keyToSlotMap关系表key对应元素
    tRelationMap::iterator iteGot = keyToSlotMap.find(key);
    if(iteGot == keyToSlotMap.end())
    {
        iteGot = keyToSlotMap.find(reverseKey);
        if(iteGot == keyToSlotMap.end())
        {
            VLOG(3)<<"checkAdd function:sessionkey is not in keyToSlotMap";
            return false;
        }
        else
        {
            Slot* pSlot = iteGot->second;
            if(NULL == pSlot)
            {
                VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
                return false;
            }

            //4.删除wheel slot中的元素
            pSlot->removeElement(reverseKey);
            VLOG(3)<<"Erase key from wheel slot!";

            //5.删除keyToSlotMap关系表中元素,便于后续添加和更新
            keyToSlotMap.erase(reverseKey);
            VLOG(3)<<"Erase key from keyToSlotMap!";

            return true;
        }
    }
    else
    {
        Slot* pSlot = iteGot->second;
        if(NULL == pSlot)
        {
            VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
            return false;
        }

        //6.删除wheel slot中的元素
        pSlot->removeElement(key);
        VLOG(3)<<"Erase key from wheel slot!";

        //7.删除keyToSlotMap关系表中元素,便于后续添加和更新
        keyToSlotMap.erase(key);
        VLOG(3)<<"Erase key from keyToSlotMap!";

        return true;
    }

    return true;
}

int CTimeWheel::getPreviousTickIndex()
{   
    //加锁
    mtx.try_lock();

    int cti = currentTickIndex;
    if(0 == cti)
    {
        return ticksPerWheel - 1;//4
    }

    return cti - 1;

    //解锁
    mtx.unlock();
}

long CTimeWheel::getCurrentTime()
{
    struct timeval tv;
    gettimeofday(&tv,NULL);
    return tv.tv_sec  + tv.tv_usec / 1000000;
}

Linux C++ 实现时间轮 优化超时检测机制

标签:

原文地址:http://blog.csdn.net/haolipengzhanshen/article/details/52012096

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!