标签:
synchronized public void startLeaderElection() { try { currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; break; } } if (myQuorumAddr == null) { throw new RuntimeException("My id " + myid + " not in the peer list"); } if (electionType == 0) { try { udpSocket = new DatagramSocket(myQuorumAddr.getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } this.electionAlg = createElectionAlgorithm(electionType); }
有 几种类型electionType 选举 算法,默认electionType=3,即快速选举fast paxos算法
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: qcm = new QuorumCnxManager(this); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
electionType=0 就是普通的paxos选举,该选举算法采用udp,开辟了一个ResponsderThread线程:
@Deprecated class ResponderThread extends Thread { ResponderThread() { super("ResponderThread"); } volatile boolean running = true; @Override public void run() { try { byte b[] = new byte[36]; ByteBuffer responseBuffer = ByteBuffer.wrap(b); DatagramPacket packet = new DatagramPacket(b, b.length); while (running) { udpSocket.receive(packet); if (packet.getLength() != 4) { LOG.warn("Got more than just an xid! Len = " + packet.getLength()); } else { responseBuffer.clear(); responseBuffer.getInt(); // Skip the xid responseBuffer.putLong(myid); Vote current = getCurrentVote(); switch (getPeerState()) { case LOOKING: responseBuffer.putLong(current.getId()); responseBuffer.putLong(current.getZxid()); break; case LEADING: responseBuffer.putLong(myid); try { long proposed; synchronized(leader) { proposed = leader.lastProposed; } responseBuffer.putLong(proposed); } catch (NullPointerException npe) { // This can happen in state transitions, // just ignore the request } break; case FOLLOWING: responseBuffer.putLong(current.getId()); try { responseBuffer.putLong(follower.getZxid()); } catch (NullPointerException npe) { // This can happen in state transitions, // just ignore the request } break; case OBSERVING: // Do nothing, Observers keep themselves to // themselves. break; } packet.setData(b); udpSocket.send(packet); } packet.setLength(b.length); } } catch (RuntimeException e) { LOG.warn("Unexpected runtime exception in ResponderThread",e); } catch (IOException e) { LOG.warn("Unexpected IO exception in ResponderThread",e); } finally { LOG.warn("QuorumPeer responder thread exited"); } } }
可以看到@Deprecated,所以说LeaderElection 方式将会被弃用,只采用fast paxos方法。
responsder线程会阻塞在receive处,等待其他server投回的结果,我们同时看看QuorumPeer线程的逻辑:
public void run() { setName("QuorumPeer" + "[myid=" + getId() + "]" + cnxnFactory.getLocalAddress()); LOG.debug("Starting quorum peer"); try { jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(jmxQuorumBean, null); for(QuorumServer s: getView().values()){ ZKMBeanInfo p; if (getId() == s.id) { p = jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxLocalPeerBean = null; } } else { p = new RemotePeerBean(s); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxQuorumBean = null; } try { /* * Main loop */ while (running) { switch (getPeerState()) { case LOOKING: LOG.info("LOOKING"); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); // Create read-only server but don‘t start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer( logFactory, this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb); // Instead of starting roZk immediately, wait some grace // period before we decide we‘re partitioned. // // Thread is used here because otherwise it would require // changes in each of election strategy classes which is // unnecessary code coupling. Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception",e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); } } else { try { setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); setPeerState(ServerState.LOOKING); } break; case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; } } } finally { LOG.warn("QuorumPeer main thread exited"); try { MBeanRegistry.getInstance().unregisterAll(); } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } jmxQuorumBean = null; jmxLocalPeerBean = null; } }
标签:
原文地址:http://www.cnblogs.com/kuipertan/p/4340444.html