码迷,mamicode.com
首页 > 其他好文 > 详细

hbase RPCServer源码分析

时间:2015-11-06 17:45:11      阅读:292      评论:0      收藏:0      [点我收藏+]

标签:

前置知识: java,nio,多线程

看了几天的源码,写一些自己心得,若有错误请指出。

RPCServer的作用:负责创建listener,reader,responser,handler来处理client端的请求。

RPCServer中重要的子类有:Listener,Reader,Call,Connection,Responser

    其中Reader是Listener的子类

listener负责监听client端的请求,主要做nio操作中的accept操作。

while (iter.hasNext()) {
  key = iter.next();
  iter.remove();
  try {
    if (key.isValid()) {
      if (key.isAcceptable())
        doAccept(key);
    }
  } catch (IOException ignored) {
  }
  key = null;
}

 

与client创建连接,生成新的channel,并将新的channel注册在reader上。

void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
  ……
  SocketChannel channel;
  ……
    Reader reader = getReader();
    try {
      reader.startAdd();
      SelectionKey readKey = reader.registerChannel(channel);  //listener接受的连接注册在reader上
      c = getConnection(channel, System.currentTimeMillis());
      readKey.attach(c);
   ……
}

 


reader负责处理listener传过来的channel,依次读取数据,

void doRead(SelectionKey key) throws InterruptedException {
  int count = 0;
  Connection c = (Connection)key.attachment();
  ……
  try {
    count = c.readAndProcess();
  } catch (InterruptedException ieo) {
  ……
}

 

这里调用Connection里面的readAndProcess()方法,这个方法的做用是读取客户端的数据,存入一个buffer字节数组中,给processRequest()方法进行处理,

processRequest方法:

protected void processRequest(byte[] buf) throws IOException, InterruptedException {
  ……
 //这里的call构造方法中的参数都是由buf中的数据解析出来的,前面省略的代码做了这部分工作
  Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
          totalRequestSize,
          traceInfo);
  //这里的scheduler是一个调度器,可以简单理解为一个线程池的控制器,它初始化时会生成默认大小的线程池,参数可由REGION_SERVER_HANDLER_COUNT来指定
  //也就是jstack文件中的handler线程,默认是30
  //dispatch方法会获取线程池中的一个线程,执行callRunner中的run()方法。run()方法的功能有:查询结果,并调用sendResponseIfReady()方法来返回数据。
  scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
}

 

call的run()方法:

public void run() {
  ……
  //查询数据,存在resultPair中
      resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
  ……
    if (!call.isDelayed() || !call.isReturnValueDelayed()) {
      Message param = resultPair != null ? resultPair.getFirst() : null;
      CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
      call.setResponse(param, cells, errorThrowable, error);
    }
  //调用Responser
    call.sendResponseIfReady();
 ……
}

 

其中rpcServer的call方法为:

public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
    Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
  ……
   //此句进行查询
    Message result = service.callBlockingMethod(md, controller, param);
   ……
  //返回给Call对象
    return new Pair<Message, CellScanner>(result,
      controller != null? controller.cellScanner(): null);
……
}

 

再详细点的还没看。看了这些主要解决了以下几个疑惑:

 

 

reader的线程数在哪指定生成,handler的线程池在哪维护,监听连接请求的线程有几个?responser的线程又有几个?

listener只有一个,

listener中有一个Reader数组,默认是10,也就是说读取请求数据的连接池大小为10。

private class Listener extends Thread {
……
  private Reader[] readers = null;

 

handler的线程池由RPCServer中的scheduler维护,默认是30,
listener监听到一个请求后,生成对应的channel发送给Reader,然后Reader会为每一个channel创建一个connection,

connection中保存了连接的信息。然后调用connection的方法来读取请求参数,并生成call对象,这时将调用scheduler,

使用handler线程池(默认30)来查询数据,(这里就开始并行了),结果存在call对象用,call对象最后再调用responser类的方法,将结果返回给client。

responser只有一个线程,它维护了一个call链表,采用非阻塞的方式(这里要说也是并行),依次将call对象送出。

大致过程就是这样

 




hbase RPCServer源码分析

标签:

原文地址:http://www.cnblogs.com/quan-qunar/p/4942972.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!