标签:ref att 限制 操作 end ati 一个队列 ica exce
通过前面几篇文章,我们可以从整体上看到zk是如何处理网络数据的宏观架构。
本文我们从细节着手,看一下一个tcp的包是如何转换到内部的数据流处理的。
一、监听用户请求socket
基于NIO的端口监听,获取tcp数据流。
// org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#AcceptThread public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; // 只监听 OP_ACCEPT 事件 this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); } // org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#run public void run() { try { // 死循环一直监听 while (!stopped && !acceptSocket.socket().isClosed()) { try { select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // This will wake up the selector threads, and tell the // worker thread pool to begin shutdown. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } } // org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#select private void select() { try { // nio select selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { // 有可用数据,接收数据 if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn‘t spin in a tight loop. pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } // org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#doAccept /** * Accept new socket connections. Enforces maximum number of connections * per client IP address. Round-robin assigns to selector thread for * handling. Returns whether pulled a connection off the accept queue * or not. If encounters an error attempts to fast close the socket. * * @return whether was able to accept a connection or not */ private boolean doAccept() { boolean accepted = false; SocketChannel sc = null; try { sc = acceptSocket.accept(); accepted = true; InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); // Round-robin assign this connection to a selector thread // 选择一个 selector线程出来进行接收连接请求,然后由该线程负责后续处理 if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; } } // org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection /** * Place new accepted connection onto a queue for adding. Do this * so only the selector thread modifies what keys are registered * with the selector. */ public boolean addAcceptedConnection(SocketChannel accepted) { // 添加到 SelectorThread 的队列中,然后即返回继续监听 if (stopped || !acceptedQueue.offer(accepted)) { return false; } // 唤醒 selector, 使其开始工作 wakeupSelector(); return true; }
二、从连接中解析数据
接上一个nio提交过来的连接后,由 SelectorThread 进行数据的读写。
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread /** * The main loop for the thread selects() on the connections and * dispatches ready I/O work requests, then registers all pending * newly accepted connections and updates any interest ops on the * queue. */ public void run() { try { // 死循环一直处理 while (!stopped) { try { select(); processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } } private void select() { try { // 该selector为本线程私有,所以其操作将是线程安全的 // 都是由 this.selector = Selector.open(); 获得 // 由 AcceptThread 线程调用 wakeupSelector, selector.wakeup(); 唤醒处理 selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected); Collections.shuffle(selectedList); Iterator<SelectionKey> selectedKeys = selectedList.iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selected.remove(key); if (!key.isValid()) { cleanupSelectionKey(key); continue; } if (key.isReadable() || key.isWritable()) { // 确认进入读写事件的处理流程 handleIO(key); } else { LOG.warn("Unexpected ops in select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * Schedule I/O for processing on the connection associated with * the given SelectionKey. If a worker thread pool is not being used, * I/O is run directly by this thread. */ private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its // connection cnxn.disableSelectable(); key.interestOps(0); touchCnxn(cnxn); // 封装IOWorkRequest 添加到工作线程池中去 workerPool.schedule(workRequest); } // org.apache.zookeeper.server.WorkerService#schedule /** * Schedule work to be done. If a worker thread pool is not being * used, work is done directly by this thread. This schedule API is * for use with non-assignable WorkerServices. For assignable * WorkerServices, will always run on the first thread. */ public void schedule(WorkRequest workRequest) { schedule(workRequest, 0); } /** * Schedule work to be done by the thread assigned to this id. Thread * assignment is a single mod operation on the number of threads. If a * worker thread pool is not being used, work is done directly by * this thread. */ public void schedule(WorkRequest workRequest, long id) { if (stopped) { workRequest.cleanup(); return; } ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest); // If we have a worker thread pool, use that; otherwise, do the work // directly. int size = workers.size(); if (size > 0) { try { // make sure to map negative ids as well to [0, size-1] // 此处的 id=0, 所以只会有一个线程池使用,提交到 scheduledWorkRequest 去处理 int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } catch (RejectedExecutionException e) { LOG.warn("ExecutorService rejected execution", e); workRequest.cleanup(); } } else { // When there is no worker thread pool, do the work directly // and wait for its completion scheduledWorkRequest.run(); } }
三、获取指定通道的具体数据 WorkerService
接上一个线程submit过来的数据!
// org.apache.zookeeper.server.WorkerService#schedule /** * Schedule work to be done by the thread assigned to this id. Thread * assignment is a single mod operation on the number of threads. If a * worker thread pool is not being used, work is done directly by * this thread. */ public void schedule(WorkRequest workRequest, long id) { if (stopped) { workRequest.cleanup(); return; } ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest); // If we have a worker thread pool, use that; otherwise, do the work // directly. int size = workers.size(); if (size > 0) { try { // make sure to map negative ids as well to [0, size-1] int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } catch (RejectedExecutionException e) { LOG.warn("ExecutorService rejected execution", e); workRequest.cleanup(); } } else { // When there is no worker thread pool, do the work directly // and wait for its completion scheduledWorkRequest.run(); } } // org.apache.zookeeper.server.WorkerService.ScheduledWorkRequest private class ScheduledWorkRequest implements Runnable { private final WorkRequest workRequest; ScheduledWorkRequest(WorkRequest workRequest) { this.workRequest = workRequest; } @Override public void run() { try { // Check if stopped while request was on queue if (stopped) { workRequest.cleanup(); return; } // 调用 WorkRequest workRequest.doWork(); } catch (Exception e) { LOG.warn("Unexpected exception", e); workRequest.cleanup(); } } } // org.apache.zookeeper.server.NIOServerCnxnFactory.IOWorkRequest#doWork public void doWork() throws InterruptedException { if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } // 针对客户端请求,进行读写处理 if (key.isReadable() || key.isWritable()) { // 将key交给 对应的 事务处理服务处理 cnxn.doIO(key); // Check if we shutdown or doIO() closed this connection if (stopped) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } // 关联到对应的channnel, 以便可以进行响应 touchCnxn(cnxn); } // Mark this connection as once again ready for selection cnxn.enableSelectable(); // Push an update request on the queue to resume selecting // on the current set of interest ops, which may have changed // as a result of the I/O operations we just performed. if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED); } } // 处理读写数据 // org.apache.zookeeper.server.NIOServerCnxn#doIO void doIO(SelectionKey k) throws InterruptedException { try { if (!isSocketOpen()) { LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId)); return; } if (k.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { handleFailedRead(); } if (incomingBuffer.remaining() == 0) { boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request incomingBuffer.flip(); isPayload = readLength(k); incomingBuffer.clear(); } else { // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword // 数据准备完成,读取消息体,顺便进行数据传递到下游 readPayload(); } else { // four letter words take care // need not do anything else return; } } } if (k.isWritable()) { handleWrite(k); if (!initialized && !getReadInterest() && !getWriteInterest()) { throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE); } } } catch (CancelledKeyException e) { LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId)); LOG.debug("CancelledKeyException stack trace", e); close(DisconnectReason.CANCELLED_KEY_EXCEPTION); } catch (CloseRequestException e) { // expecting close to log session closure close(); } catch (EndOfStreamException e) { LOG.warn("Unexpected exception", e); // expecting close to log session closure close(e.getReason()); } catch (ClientCnxnLimitException e) { // Common case exception, print at debug level ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e); close(DisconnectReason.CLIENT_CNX_LIMIT); } catch (IOException e) { LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e); close(DisconnectReason.IO_EXCEPTION); } } // org.apache.zookeeper.server.NIOServerCnxn#readPayload /** Read the request payload (everything following the length prefix) */ private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { handleFailedRead(); } } // 多次确认数据是否已到齐 if (incomingBuffer.remaining() == 0) { // have we read length bytes? incomingBuffer.flip(); packetReceived(4 + incomingBuffer.remaining()); if (!initialized) { readConnectRequest(); } else { // 对初始化后的数据处理,直接读取 buffers readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; } } // org.apache.zookeeper.server.NIOServerCnxn#readRequest private void readRequest() throws IOException { // 交给 zkServer 处理,其中包含了各种准备好的处理链配置 zkServer.processPacket(this, incomingBuffer); } // org.apache.zookeeper.server.ZooKeeperServer#processPacket public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next InputStream bais = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); RequestHeader h = new RequestHeader(); h.deserialize(bia, "header"); // Need to increase the outstanding request count first, otherwise // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. // // Be aware that we‘re actually checking the global outstanding // request before this request. // // It‘s fine if the IOException thrown before we decrease the count // in cnxn, since it will close the cnxn anyway. cnxn.incrOutstandingAndCheckThrottle(h); // Through the magic of byte buffers, txn will not be // pointing // to the start of the txn incomingBuffer = incomingBuffer.slice(); // 根据请求类型,简单分类处理 if (h.getType() == OpCode.auth) { LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); AuthPacket authPacket = new AuthPacket(); ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; if (ap != null) { try { // handleAuthentication may close the connection, to allow the client to choose // a different server to connect to. authReturn = ap.handleAuthentication( new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth()); } catch (RuntimeException e) { LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); authReturn = KeeperException.Code.AUTHFAILED; } } if (authReturn == KeeperException.Code.OK) { LOG.debug("Authentication succeeded for scheme: {}", scheme); LOG.info("auth success {}", cnxn.getRemoteSocketAddress()); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } else { if (ap == null) { LOG.warn( "No authentication provider for scheme: {} has {}", scheme, ProviderRegistry.listProviders()); } else { LOG.warn("Authentication failed for scheme: {}", scheme); } // send a response... ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // ... and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); cnxn.disableRecv(); } return; } else if (h.getType() == OpCode.sasl) { processSasl(incomingBuffer, cnxn, h); } else { // 通用请求处理 if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) { ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue()); cnxn.sendResponse(replyHeader, null, "response"); cnxn.sendCloseSession(); cnxn.disableRecv(); } else { // 封装 Request, 至此, 内部数据模型已生成, sessionId, xid, buffers... Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); int length = incomingBuffer.limit(); if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected checkRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(ServerCnxn.me); // 提交到请求队列中 submitRequest(si); } } } public void submitRequest(Request si) { enqueueRequest(si); } public void enqueueRequest(Request si) { if (requestThrottler == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (requestThrottler == null) { throw new RuntimeException("Not started"); } } } // 提交请求给限流器 // requestThrottler = new RequestThrottler(this); 节流器处理完成后,最终仍会提交给当前 zkServer 处理 requestThrottler.submitRequest(si); } // org.apache.zookeeper.server.RequestThrottler#submitRequest public void submitRequest(Request request) { // 如果已停止,则删除队列 if (stopping) { LOG.debug("Shutdown in progress. Request cannot be processed"); dropRequest(request); } else { // LinkedBlockingQueue 入队,最终由该线程去异步处理 submittedRequests.add(request); } }
四、第一个队列处理请求 - 限流器 RequestThrottler
通过上面的tcp数据转换,将其转换为了 Request 实例,提交到队列了。接下来这个队列将被 RequestThrottler 处理,它的作用是判定是否超出了设置的最大请求数,如果超出,则作等待处理,防止下游无法应对。
// org.apache.zookeeper.server.RequestThrottler#run @Override public void run() { try { // 死循环一直处理 while (true) { if (killed) { break; } // 阻塞式获取,即只要数据被提交,就会被立即处理 Request request = submittedRequests.take(); if (Request.requestOfDeath == request) { break; } if (request.mustDrop()) { continue; } // Throttling is disabled when maxRequests = 0 if (maxRequests > 0) { while (!killed) { if (dropStaleRequests && request.isStale()) { // Note: this will close the connection dropRequest(request); ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1); request = null; break; } // 只要没达到最大限制,直接通过 if (zks.getInProcess() < maxRequests) { break; } throttleSleep(stallTime); } } if (killed) { break; } // A dropped stale request will be null if (request != null) { if (request.isStale()) { ServerMetrics.getMetrics().STALE_REQUESTS.add(1); } // 验证通过后,提交给 zkServer 处理 zks.submitRequestNow(request); } } } catch (InterruptedException e) { LOG.error("Unexpected interruption", e); } int dropped = drainQueue(); LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped); } // org.apache.zookeeper.server.ZooKeeperServer#submitRequestNow public void submitRequestNow(Request si) { // 确保处理器链路已生成 if (firstProcessor == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); // 验证数据类型是否支持 boolean validpacket = Request.isValid(si.type); if (validpacket) { setLocalSessionFlag(si); // 调用第一个处理器进行处理了,进入正式流程,如 LeaderRequestProcessor firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type {}", si.type); // Update request accounting/throttling limits requestFinished(si); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { LOG.debug("Dropping request.", e); // Update request accounting/throttling limits requestFinished(si); } catch (RequestProcessorException e) { LOG.error("Unable to process request", e); // Update request accounting/throttling limits requestFinished(si); } }
至此,请求数据初始化就完成了,就可以好好去分析处理器的处理过程了。
五、 firstProcessor 示例一号传球手 --LeaderRequestProcessor
// org.apache.zookeeper.server.quorum.LeaderRequestProcessor#processRequest @Override public void processRequest(Request request) throws RequestProcessorException { // Screen quorum requests against ACLs first if (!lzks.authWriteRequest(request)) { return; } // Check if this is a local session and we are trying to create // an ephemeral node, in which case we upgrade the session Request upgradeRequest = null; try { upgradeRequest = lzks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { LOG.debug("Updating header"); request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.warn("Error creating upgrade request", ke); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { nextProcessor.processRequest(upgradeRequest); } // 直接转给下一处理器方法,本身未做过多逻辑 nextProcessor.processRequest(request); } // org.apache.zookeeper.server.PrepRequestProcessor#processRequest public void processRequest(Request request) { request.prepQueueStartTime = Time.currentElapsedTime(); // LinkedBlockingQueue, 添加到 PrepRequestProcessor 的队列中,进入真正的处理链逻辑 submittedRequests.add(request); // 添加监控信息统计 ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1); }
六、firstProcessor 示例一号传球手 --FollowerRequestProcessor
// org.apache.zookeeper.server.quorum.FollowerRequestProcessor#processRequest public void processRequest(Request request) { processRequest(request, true); } void processRequest(Request request, boolean checkForUpgrade) { if (!finished) { if (checkForUpgrade) { // Before sending the request, check if the request requires a // global session and what we have is a local session. If so do // an upgrade. Request upgradeRequest = null; try { upgradeRequest = zks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.warn("Error creating upgrade request", ke); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { queuedRequests.add(upgradeRequest); } } // FollowerRequestProcessor 有一定的处理逻辑,将其添加到自有队列 queuedRequests 中 queuedRequests.add(request); } } // org.apache.zookeeper.server.quorum.FollowerRequestProcessor#run @Override public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, ‘F‘, request, ""); } if (request == Request.requestOfDeath) { break; } // Screen quorum requests against ACLs first if (!zks.authWriteRequest(request)) { continue; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response // 此处将会提交给 CommitProcessor, 进入 Follower 本地处理 nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this follower has pending, so we // add it to pendingSyncs. // 针对写请求,将其转发给 Leader 处理 switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getFollower().request(request); break; case OpCode.createSession: case OpCode.closeSession: // Don‘t forward local sessions to the leader. if (!request.isLocalSession()) { zks.getFollower().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("FollowerRequestProcessor exited loop!"); } // org.apache.zookeeper.server.quorum.Learner#request /** * send a request packet to the leader * * @param request * the request from the client * @throws IOException */ void request(Request request) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream oa = new DataOutputStream(baos); oa.writeLong(request.sessionId); oa.writeInt(request.cxid); oa.writeInt(request.type); if (request.request != null) { request.request.rewind(); int len = request.request.remaining(); byte[] b = new byte[len]; request.request.get(b); request.request.rewind(); oa.write(b); } oa.close(); // 转发请求给 Leader, 详情待续 QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); writePacket(qp, true); }
七、firstProcessor 示例一号传球手 --ObserverRequestProcessor
ObserverRequestProcessor 与 Follower 类似.
// org.apache.zookeeper.server.quorum.ObserverRequestProcessor#processRequest /** * Simply queue the request, which will be processed in FIFO order. */ public void processRequest(Request request) { if (!finished) { Request upgradeRequest = null; try { upgradeRequest = zks.checkUpgradeSession(request); } catch (KeeperException ke) { if (request.getHdr() != null) { request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(ke.code().intValue())); } request.setException(ke); LOG.info("Error creating upgrade request", ke); } catch (IOException ie) { LOG.error("Unexpected error in upgrade", ie); } if (upgradeRequest != null) { queuedRequests.add(upgradeRequest); } // 添加到自身的工作队列中,稍后处理 queuedRequests.add(request); } } // org.apache.zookeeper.server.quorum.ObserverRequestProcessor#run @Override public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, ‘F‘, request, ""); } if (request == Request.requestOfDeath) { break; } // Screen quorum requests against ACLs first if (!zks.authWriteRequest(request)) { continue; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this Observer has pending, so we // add it to pendingSyncs. switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getObserver().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getObserver().request(request); break; case OpCode.createSession: case OpCode.closeSession: // Don‘t forward local sessions to the leader. if (!request.isLocalSession()) { zks.getObserver().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("ObserverRequestProcessor exited loop!"); }
OK!至此,我们已经完整地看到了一个 tcp 数据流如何转变为 zk 的数据包,并准备进入 正式的业务处理流程。
总结下:
1. 一个 Accept 线程接入客户端请求;
2. 一组 Selector 线程轮询处理读写请求;
3. 线程池处理, 一组 WorkerService 进行提交读写数据;
4. 由 ZooKeeperServer 进行 Request包的封装;
5. 经限流器把关 RequestThrottler, hold过多的请求, 然后再回高 ZooKeeperServer 入队;
6. ZooKeeperServer 找到 firstProcessor, 将请求 Request 传递到处理链中, 准备过程完毕;
7. 各个 firstProcessor 的处理逻辑不一,其中 Leader 仅是将 Request 传递到处理链中, 而 Follower/Observer 则要自己做一些额外的工作;
整个过程,是在N个线程之间不停地传递处理,各自负责各自的领域点。而理清zk的工作方式,难点就在于其多线程的配合过程。
ZooKeeper(四):从TCP数据流到zk内部处理包的转换
标签:ref att 限制 操作 end ati 一个队列 ica exce
原文地址:https://www.cnblogs.com/yougewe/p/11758005.html