从图中可以清楚的看到当客户端向服务器端发送一个请求的时候,最开始是被RPCServer中的Listener所监听到的,如下面的代码所示,HBase的RpcServer启动的时候会启动几个处理线程:
responder.start(); listener.start(); scheduler.start();其中,Responder线程负责数据的request的回复工作,listener负责监听客户端的请求,scheduler负责具体call的调度工作
while (running) { SelectionKey key = null; try { selector.select(); // FindBugs IS2_INCONSISTENT_SYNC Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } catch (IOException ignored) { if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); } key = null; } }listener的实现过程中运用了java nio的一些特性,主要是每个Listener线程又会管理这一个Reader的线程池,这些Reader具体负责从Socket Channel中读取数据,并解析数据中的相关项,进而构造出可运行的CallRunner:
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, traceInfo); scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));RpcScheduler调用对应的RpcExcutor进行相应的处理,RpcExcutor中启动了多个处理线程,这些线程从队列中取出任务并且执行,
protected void startHandlers(final String nameSuffix, final int numHandlers, final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize, final int port) { final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); for (int i = 0; i < numHandlers; i++) { final int index = qindex + (i % qsize); Thread t = new Thread(new Runnable() { @Override public void run() { consumerLoop(callQueues.get(index)); } }); t.setDaemon(true); t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() + ",queue=" + index + ",port=" + port); t.start(); LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index); handlers.add(t); } }
protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) { boolean interrupted = false; double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); try { while (running) { try { CallRunner task = myQueue.take(); try { activeHandlerCount.incrementAndGet(); task.run();
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, call.timestamp, this.status);
if (!call.isDelayed() || !call.isReturnValueDelayed()) { Message param = resultPair != null ? resultPair.getFirst() : null; CellScanner cells = resultPair != null ? resultPair.getSecond() : null; call.setResponse(param, cells, errorThrowable, error); } call.sendResponseIfReady();到此结束就简要介绍了一个完整的服务端RPC处理流程,该流程中涉及到的相关的类的关系如下图所示:
原文地址:http://blog.csdn.net/youmengjiuzhuiba/article/details/44750631