标签:
说到zookeeper session管理 ,免不了要问
什么是session?
session id/session是如何产生的?
session 信息如何存储?
本文以session tracker线程【详见SessionTrackerImpl】的运行机制作为主线,并尝试解答一些相关问题
在介绍session tracker线程之前先回答几个问题
zookeeper中session意味着一个物理连接,客户端connect成功之后,会发送一个connect型请求,此时就会有session 产生(下面会具体讲)
在SessionTrackerImpl实例化的时候就会调用下面的函数【详见SessionTrackerImpl.initializeNextSession】
| 1 2 3 4 5 6 | publicstaticlonginitializeNextSession(longid) {       longnextSid = 0;       nextSid = (System.currentTimeMillis() << 24) >> 8;       nextSid =  nextSid | (id <<56);       returnnextSid;   } | 
产生的值会存入nextSessionId属性,以后一旦有新的连接(session)产生,就会nextSessionId++
接到一个连接类型的请求【详见ZooKeeperServer.processConnectRequest】
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | intsessionTimeout = connReq.getTimeOut();        bytepasswd[] = connReq.getPasswd();        intminSessionTimeout = getMinSessionTimeout();        if(sessionTimeout < minSessionTimeout) {            sessionTimeout = minSessionTimeout;        }        intmaxSessionTimeout = getMaxSessionTimeout();        if(sessionTimeout > maxSessionTimeout) {            sessionTimeout = maxSessionTimeout;        }        cnxn.setSessionTimeout(sessionTimeout);        // We don‘t want to receive any packets until we are sure that the        // session is setup        cnxn.disableRecv();        longsessionId = connReq.getSessionId();        if(sessionId != 0) {            longclientSessionId = connReq.getSessionId();            LOG.info("Client attempting to renew session 0x"                    + Long.toHexString(clientSessionId)                    + " at "+ cnxn.getRemoteSocketAddress());            serverCnxnFactory.closeSession(sessionId);            cnxn.setSessionId(sessionId);            reopenSession(cnxn, sessionId, passwd, sessionTimeout);        } else{            LOG.info("Client attempting to establish new session at "                    + cnxn.getRemoteSocketAddress());            createSession(cnxn, passwd, sessionTimeout);        } | 
【详见SessionTrackerImpl.createSession】
| 1 2 3 4 | synchronizedpubliclongcreateSession(intsessionTimeout) {        addSession(nextSessionId, sessionTimeout);        returnnextSessionId++;    } | 
可见产生session需要两个元素,一个是sessionid,一个是timeout
timeout由客户端确定,但必须在服务器规定的最大的timeout(ticktime*20)和最小的timeout(ticktime*2)之间
如果客户端没有指定sessionid,那么就会产生一个新的session【详见ZooKeeperServer.createSession】,否则会reopen【详见ZooKeeperServer.reopenSession】
sessionid的产生上面解释过了
【详见SessionTrackerImpl.addSession】
| 1 2 3 4 5 6 7 8 9 10 | sessionsWithTimeout.put(id, sessionTimeout);        if(sessionsById.get(id) == null) {            SessionImpl s = newSessionImpl(id, sessionTimeout, 0);            sessionsById.put(id, s);            if(LOG.isTraceEnabled()) {                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,                        "SessionTrackerImpl --- Adding session 0x"                        + Long.toHexString(id) + " "+ sessionTimeout);            }        } | 
一个重要的数据结构sessionsWithTimeout存放sessionid和timeout的映射
另一个重要的数据结构sessionsById存放sessionid和SessionImpl实例的映射
【详见SessionTrackerImpl.touchSession】
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | longexpireTime = roundToInterval(System.currentTimeMillis() + timeout);        if(s.tickTime >= expireTime) {            // Nothing needs to be done            returntrue;        }        SessionSet set = sessionSets.get(s.tickTime);        if(set != null) {            set.sessions.remove(s);        }        s.tickTime = expireTime;        set = sessionSets.get(s.tickTime);        if(set == null) {            set = newSessionSet();            sessionSets.put(expireTime, set);        }        set.sessions.add(s); | 
根据当前时间和timeout计算本session 的expireTime即tickTime
一个重要的数据结构sessionSets 存放过期时间和一组session实例(相同过期时间)的映射的建立及维护
session实例的tickTime的确定
在zookeeper服务体系中,专门有一个线程(session tracker)维护session【详见SessionTrackerImpl.run】,重要代码如下
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | currentTime = System.currentTimeMillis();if(nextExpirationTime > currentTime) {    this.wait(nextExpirationTime - currentTime);    continue;}SessionSet set;set = sessionSets.remove(nextExpirationTime);if(set != null) {    for(SessionImpl s : set.sessions) {        sessionsById.remove(s.sessionId);        expirer.expire(s);    }}nextExpirationTime += expirationInterval; | 
可见SessionTrackerImpl这个线程会一直轮询的清除过期session
每次轮询都会比较currentTime和nextExpirationTime,如果还未到nextExpirationTime,就等,否则往下走
将sessionSets中的以nextExpirationTime为key的那组session移出
遍历session,从sessionsById移除session,并调用相关的过期处理(下面会讲)
调整下载比较的时间,即nextExpirationTime += expirationInterval;
【详见ZooKeeperServer.close】
| 1 2 3 | privatevoidclose(longsessionId) {       submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);   } | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | request.hdr = newTxnHeader(request.sessionId, request.cxid, zxid,                            zks.getTime(), type);                                                   switch(type) {                                                      //省略N行代码......                                                      caseOpCode.closeSession:        // We don‘t want to do this check since the session expiration thread        // queues up this operation without being the session owner.        // this request is the last of the session so it should be ok        //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());        HashSet<String> es = zks.getZKDatabase()                .getEphemerals(request.sessionId);        synchronized(zks.outstandingChanges) {            for(ChangeRecord c : zks.outstandingChanges) {                if(c.stat == null) {                    // Doing a delete                    es.remove(c.path);                } elseif(c.stat.getEphemeralOwner() == request.sessionId) {                    es.add(c.path);                }            }            for(String path2Delete : es) {                addChangeRecord(newChangeRecord(request.hdr.getZxid(),                        path2Delete, null, 0, null));            }                                                               zks.sessionTracker.setSessionClosing(request.sessionId);        }                                                           LOG.info("Processed session termination for sessionid: 0x"                + Long.toHexString(request.sessionId));        break; | 
设置request.hdr,这个很重要,
在FinalRequestProcessor.processRequest会有相应的处理
| 1 2 3 4 5 6 | if(request.hdr != null) {              TxnHeader hdr = request.hdr;              Record txn = request.txn;                rc = zks.processTxn(hdr, txn);           } | 
?
一旦某个session关闭,与session相关的EPHEMERAL类型的节点都得清除
并且通过调用sessionTracker.setSessionClosing将session设置为关闭,使得后续此session上的请求无效
【详见SessionTrackerImpl.removeSession】
| 1 2 3 4 5 6 7 8 9 10 11 12 | synchronizedpublicvoidremoveSession(longsessionId) {    SessionImpl s = sessionsById.remove(sessionId);    sessionsWithTimeout.remove(sessionId);    if(LOG.isTraceEnabled()) {        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,                "SessionTrackerImpl --- Removing session 0x"                + Long.toHexString(sessionId));    }    if(s != null) {        sessionSets.get(s.tickTime).sessions.remove(s);    }} | 
分别对sessionsById、sessionsWithTimeout、sessionSets进行处理
如果不是在集群环境,即没有LearnerHandler线程,session 的owner就是一个常量实例ServerCnxn.me
sessionsWithTimeout存放的是sessionid和timeout,此数据结构会和ZKDatabase中相通,会被持久化
如果某个session timeout为60s,如果空闲了30s,意味着还能空闲30s,此时服务重启,那么此session的timeout又变为60s
每次一旦该session有请求,就会touch,意味着session的过期时间变为(基本等于当前时间+timeout)
具体算法为
| 1 2 3 4 | privatelongroundToInterval(longtime) {        // We give a one interval grace period        return(time / expirationInterval + 1) * expirationInterval;    } | 
time为System.currentTimeMillis() + timeout
expirationInterval默认为ticktime
基本上所有的事务型操作,都会调用用来验证当前请求的session是否关闭,owner是否正确
SessionTrackerImpl作为一个单独的线程专门处理过期session
SessionTrackerImpl有3个重要的数据结构sessionsById、sessionSets、sessionsWithTimeout,其中sessionsWithTimeout会被持久化
SessionTrackerImpl提供了几个常用的API
createSession
addSession
touchSession
removeSession
checkSession
setOwner
dumpSessions
标签:
原文地址:http://www.cnblogs.com/405845829qq/p/4667675.html