标签:log dep 实例化 extends ndt star shp ring 方法
前面三篇主要从client的角度说了下client和server建立连接的过程,这一篇和后面一篇开始看下Zookeeper中非常重要的一个概念:Session,session是zookeeper client和server建立和维护连接的单位(我这个描述感觉有点奇怪 ?? )。
Zookeeper的所有操作基本都是基于session的,如之前提到的wathcer的机制,客户端请求的顺序执行和临时节点的生命周期。
从我们使用API的角度,session的连接和保持就是客户端通过实例化Zookeeper对象来与Zookeeper server端创建并保持连接TCP连接的过程。在客户端与服务器端成功创建了一个连接后,一个会话就被创建了。而在一个会话的生命周期中,session的状态可能在几种不同的状态中切换,而这些状态可以分为connecting,connected,reconnecting,reconnected,close等。
p.s. 这里要提一下第一步,在3.2.0版本中增加了chroot后缀(配置时加在后面,不是说这个chroot的功能是后缀,而且恰恰相反,功能是前缀)的设置,在zk的配置中类似配置 "127.0.0.1:4545/app/a" 或者 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"这样的设置,那么在zookeeper上所有的节点前都回家再/app/a的前缀。所以连接后在server上的根节点是/app/a。
作用:This feature is particularly useful in multi-tenant environments where each user of a particular ZooKeeper service could be rooted differently. This makes re-use much simpler as each user can code his/her application as if it were rooted at "/", while actual location (say /app/a) could be determined at deployment time.
Zookeeper官方文档是这样描述的,就是说在一台机器或vm上有多个tenant安装zookeeper,通过在配置中增加这样的配置,这样根据节点本身就知道它的具体位置。
Zookeeper官方用下图来表示状态的变化:
结合在sendthread介绍中说的,在startconnect方法中连接中会把状态设置为States.CONNECTING,连接成功后,在Onconnected方法里会把状态设置为CONNECTED。
在zk的create/exists/getchildren…等等接口内部最后回去submitRequest并把生成的packet放入queue中,在queuePacket方法的cnLossPacket方法中会根据状态去处理session超时,验证失败和连接丢失的问题。
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
}
switch (state) {
case AUTH_FAILED://验证失败
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED://session超时导致close
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default://其他原因导致连接丢失
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
}
finishPacket(p);
}
在创建Session时,需要设置Session Timeout这个重要参数。这是Zookeeper服务允许一个Session在定义它失效之前的时间。如果服务在时间t内不能看到与一个Session关联的消息,它将定义这个Session失效。如果客户端在1/3 t时间内没有听到任何从服务器过来的消息,它将发送一个心跳消息给服务器。在(2/3)t时间, Zookeeper客户端开始寻找另一个Zookeeper服务器,并且它有另外的(1/3)t的时间寻找。
public interface SessionTracker {
public static interface Session {
long getSessionId();,
int getTimeout();
boolean isClosing();
}
public static interface SessionExpirer {
void expire(Session session);
long getServerId();
}
可以看到,在SessionTracker接口中有两个内部接口Session和SessionExpirer,可以看到分别和session与session过期有关系。
public static class SessionImpl implements Session {
SessionImpl(long sessionId, int timeout, long expireTime) {
this.sessionId = sessionId;
this.timeout = timeout;
this.tickTime = expireTime;
isClosing = false;
}
final long sessionId;
final int timeout;
long tickTime;
boolean isClosing;
Object owner;
public long getSessionId() { return sessionId; }
public int getTimeout() { return timeout; }
public boolean isClosing() { return isClosing; }
}
在SessionTrackerImpl类中有Session接口的实现类,此类也代表了一个真正的session对象。可以看到SessionImpl类中有几个变量:
sessionId:会话ID,用来标识一个唯一会话。每次客户端和server连接创建新会话时,zk会为其分别一个全局唯一的ID;
timeout:在创建zookeeper对象时传入的参数,客户端向server发送了这个参数后,服务器会根据timeout时间来判断session的状态;
ticktime:下次会话超时的时间点,大约为当前时间+timeout,具体之后详细解释;
isclosing:表明一个会话是否已经被关闭,如果一个会话已经被标记为closing,server便不会处理来自此session的请求。
在ZookeeperServer的processConnectRequest方法中有对客户端建立连接请求的处理:
if (sessionId != 0) {//sessionId已经存在
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);//重新打开session
} else {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
createSession(cnxn, passwd, sessionTimeout);//新建session
}
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
long sessionId = sessionTracker.createSession(timeout);
synchronized public long createSession(int sessionTimeout) {
addSession(nextSessionId, sessionTimeout);
return nextSessionId++;//每次取过之后nextSessionId+1
}
synchronized public void addSession(long id, int sessionTimeout) {
sessionsWithTimeout.put(id, sessionTimeout);
if (sessionsById.get(id) == null) {
SessionImpl s = new SessionImpl(id, sessionTimeout, 0);//新建sessionImpl对象
可以看到在每次新建session是建立在已经保存的nextSessionId的基础上的。然后看一下nextSessionId的初始化:
public static long initializeNextSession(long id) {
long nextSid = 0;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id <<56);
return nextSid;
}
initializeNextSession方法在zookeeperserver启动时的startup方法中,startup方法会初始化SessionTrackerImpl变量,此时nextSessionId会被初始化。
这里用到了Time.currentElapsedTime()方法去获得当前的时间,是一个64位的值。但是在之前的版本中用的是System.currentTimeMillis() 方法。为什么要用新的方法替代原来的值,事实上在正常情况下都不会有问题,但是如果有人修改了系统的时间,那么原来的方法就可能有问题。
至于nextSid生成的算法:系统时间先左移24位然后无符号右移8位然后和myid文件中的唯一id值左移56位生成的值做或操作,这样可以生产一个64位的唯一ID,然后后面的session基于这个值递增获得。这也是为什么在myid文件中配置唯一id时必须要小于256的原因。
sessiontracker的作用就是server用来管理会话的,它负责了session的创建,管理和删除,整个session的生命周期都在sessiontracker的管理之下。每个session在sessiontracker内都分成三份保存。
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
private static final Logger LOG = LoggerFactory.getLogger(SessionTrackerImpl.class);
HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();//
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
long nextSessionId = 0;//下一次session的id
long nextExpirationTime;//最近的超时时间
int expirationInterval;//超时检查间隔
static class SessionSet {
HashSet<SessionImpl> sessions = new HashSet<SessionImpl>();
}
sessionsById是根据session的id来管理session实体的属性;而sessionSets则是根据下次超时时间来归档回话,便于会话管理和超时审查;sessionsWithTimeout是线程安全的,它也是按照id来保存session的超时时间,sessionsWithTimeout和zk的内存数据库相通,会定期同步到快照中。
这一篇主要说了些宏观的概念和session id的生成机制,比较泛,但是是下一篇的基础。
https://zookeeper.apache.org/doc/r3.3.6/zookeeperProgrammers.html
https://blog.csdn.net/jeff_fangji/article/details/43916359
https://www.jianshu.com/p/594129a44814
http://www.cnblogs.com/leesf456/p/6103870.html
https://xt00002003.iteye.com/blog/2302392
标签:log dep 实例化 extends ndt star shp ring 方法
原文地址:https://www.cnblogs.com/gongcomeon/p/10198222.html