标签:
前置知识: java,nio,多线程
看了几天的源码,写一些自己心得,若有错误请指出。
RPCServer的作用:负责创建listener,reader,responser,handler来处理client端的请求。
RPCServer中重要的子类有:Listener,Reader,Call,Connection,Responser
其中Reader是Listener的子类
listener负责监听client端的请求,主要做nio操作中的accept操作。
while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } catch (IOException ignored) { } key = null; }
与client创建连接,生成新的channel,并将新的channel注册在reader上。
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { …… SocketChannel channel; …… Reader reader = getReader(); try { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); //listener接受的连接注册在reader上 c = getConnection(channel, System.currentTimeMillis()); readKey.attach(c); …… }
reader负责处理listener传过来的channel,依次读取数据,
void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); …… try { count = c.readAndProcess(); } catch (InterruptedException ieo) { …… }
这里调用Connection里面的readAndProcess()方法,这个方法的做用是读取客户端的数据,存入一个buffer字节数组中,给processRequest()方法进行处理,
processRequest方法:
protected void processRequest(byte[] buf) throws IOException, InterruptedException { …… //这里的call构造方法中的参数都是由buf中的数据解析出来的,前面省略的代码做了这部分工作 Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, traceInfo); //这里的scheduler是一个调度器,可以简单理解为一个线程池的控制器,它初始化时会生成默认大小的线程池,参数可由REGION_SERVER_HANDLER_COUNT来指定 //也就是jstack文件中的handler线程,默认是30 //dispatch方法会获取线程池中的一个线程,执行callRunner中的run()方法。run()方法的功能有:查询结果,并调用sendResponseIfReady()方法来返回数据。 scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider)); }
call的run()方法:
public void run() { …… //查询数据,存在resultPair中 resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, …… if (!call.isDelayed() || !call.isReturnValueDelayed()) { Message param = resultPair != null ? resultPair.getFirst() : null; CellScanner cells = resultPair != null ? resultPair.getSecond() : null; call.setResponse(param, cells, errorThrowable, error); } //调用Responser call.sendResponseIfReady(); …… }
其中rpcServer的call方法为:
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException { …… //此句进行查询 Message result = service.callBlockingMethod(md, controller, param); …… //返回给Call对象 return new Pair<Message, CellScanner>(result, controller != null? controller.cellScanner(): null); …… }
再详细点的还没看。看了这些主要解决了以下几个疑惑:
reader的线程数在哪指定生成,handler的线程池在哪维护,监听连接请求的线程有几个?responser的线程又有几个?
listener只有一个,
listener中有一个Reader数组,默认是10,也就是说读取请求数据的连接池大小为10。
private class Listener extends Thread { …… private Reader[] readers = null;
handler的线程池由RPCServer中的scheduler维护,默认是30,
listener监听到一个请求后,生成对应的channel发送给Reader,然后Reader会为每一个channel创建一个connection,
connection中保存了连接的信息。然后调用connection的方法来读取请求参数,并生成call对象,这时将调用scheduler,
使用handler线程池(默认30)来查询数据,(这里就开始并行了),结果存在call对象用,call对象最后再调用responser类的方法,将结果返回给client。
responser只有一个线程,它维护了一个call链表,采用非阻塞的方式(这里要说也是并行),依次将call对象送出。
大致过程就是这样
标签:
原文地址:http://www.cnblogs.com/quan-qunar/p/4942972.html