码迷,mamicode.com
首页 > 其他好文 > 详细

Thrift源码分析阅读(一)

时间:2016-03-12 11:51:18      阅读:293      评论:0      收藏:0      [点我收藏+]

标签:

Thrift -Storm篇

从Nimbus启动说起:

当用户通过命令启动nimbus时,Classloader将会找到一个称之为bytetype.storm.daemon.nimbus的一个class文件,这个是由numbis.clj文件编译而成,来看nimbus.clj这个的启动方法:

(defn -main []

(-launch (standalone-nimbus)))

 

(standalone-nimbus) 执行这个方法返回一个INimbus接口的实例

 

执行launch(INimbus)这个方法,这个方法也在nimbus.clj上定义

 

(defn -launch [nimbus]

;; read-storm-config 在util.clj中定义

(launch-server! (read-storm-config) nimbus))

 

(read-storm-config)返回配置文件信息,如在storm.yaml上的定义,得到的是一个map实例。

 

接着我们执行launch-server!这个方法,执行流程:

  1. 检查启动模式是否是分布式模式
  2. 创建一个service-handler的一个句柄,这个用来处理Nimbus上的各种业务逻辑,如提交一个topology, kill topology等
  3. 配置服务器,如nimbus服务端口,处理线程池大小,业务处理分发器处理业务逻辑的service-handler服务句柄。
  4. 根据配置服务器的参数,创建nimbus服务,服务是一个THsHaServer。
  5. 添加主线程shutdown时的回调函数。
  6. 开启服务。

至此,这边完成了nimbus开启服务的整个过程。

重点过程在第2步和 3 4 6步

首先我们将注意力集中在第3 4 6步骤,观察nimbus如何提供各种服务。对应的clojure代码:

(let [

第3步:

options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))

(THsHaServer$Args.)

(.workerThreads 64)

(.protocolFactory

(TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))

(.processor (Nimbus$Processor. service-handler))

第4步:

server (THsHaServer.

(do (set! (. options maxReadBufferBytes)

(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]

第6步:

(.serve server))) 

我们从第4步直接查看THsHaServer的构造方法:

private ExecutorService invoker;

public THsHaServer(Args args) {

super(args);

invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;

}

 

这里的args就相当于clojure代码中的options变量,由于在options中没有设置executorService这个变量,那么将调用createInvokerPool(args)这个方法线程池。

/**

* Helper to create an invoker pool

*/

protected static ExecutorService createInvokerPool(Args options) {

int workerThreads = options.workerThreads;

int stopTimeoutVal = options.stopTimeoutVal;

TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

 

LinkedBlockingQueue<Runnable> queue =

new LinkedBlockingQueue<Runnable>();

ExecutorService invoker =

new ThreadPoolExecutor(workerThreads, workerThreads,

stopTimeoutVal, stopTimeoutUnit, queue);

 

return invoker;

}

 

这里就十分看清楚了,根据设置,返回的是一个为64的固定大小的线程池。

第6步,调用THsHaServerserve方法时,

/** @inheritDoc */

@Override

public void serve() {

if (!startSelectorThread()) {

return;

}

 

// this will block while we serve

joinSelector();

 

}

 

 

我们重点关注startSelectorThread joinSelector 两个方法。

 

  1. startSelectorThread()

    startSelectorThread()方法是父类 TNonblockingServer的一个方

     

    private SelectThread selectThread_;

     

protected boolean startSelectorThread() {

// start the selector

     selectThread_ =new

SelectThread((TNonblockingServerTransport)serverTransport_);

stopped_ = false;

selectThread_.start();

return true;

} catch (IOException e) {

LOGGER.error("Failed to start selector thread!", e);

return false;

}

}

这个serverTransport_是怎么来的呢?

首先看clojure代码:

-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))

(THsHaServer$Args.)

 

技术分享

 

在创建Args时,不停的调用父类的构造方法,同时,在创建THsHaServer时,也在不停的调用父类的构造,最终我们在TServer的构造方法里:

public abstract class TServer {

 

public static class Args extends AbstractServerArgs<Args> {

public Args(TServerTransport transport) {

super(transport);

}

}

 

public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {

final TServerTransport serverTransport;

 

public AbstractServerArgs(TServerTransport transport) {

serverTransport = transport;

}

 

protected TServer(AbstractServerArgs args) {

serverTransport_ = args.serverTransport;

}

 

也就是说serverTransport_也就是TNonblockingServerSocket的一个实例。

 

SelectThread 是一个Thread的子类,在其构造方法中,完成了TNonblockingServerSocket实例注册监听器的过程。

 

this.selector = SelectorProvider.provider().openSelector();

serverTransport.registerSelector(selector);

 

开启SelectThread线程后,

 

技术分享

 

  1. 当有一个accept事件时。
    1. 服务端ServerSocket接收该事件,并返回一个TNonblockingSocket对象,在这个对象中封装了接收该事件的端口信息SocketChannel。并将该端口信息注册到THsHaServerSelector对象中,并标记为可读。
    2. 创建一个FrameBuffer的实例,绑定在SelectionKey

      clientKey = client.registerSelector(selector, SelectionKey.OP_READ)

// add this key to the map

FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);

clientKey.attach(frameBuffer);

 以后对应的输入输出都将发生在这个端口中。因此是TTransport的一个实现。

 

技术分享

 

  1. 当有一个read事件时。
    1. 得到SelectionKey绑定的FrameBuffer对象,

      FrameBuffer buffer = (FrameBuffer)key.attachment();

    2. buffer.read()
      1. 读取framesize的大小,占四个字节。

        这里有个很有意思的控制实例读取buffer的大小,使用一个private final AtomicLong readBufferBytesAllocated = new AtomicLong(0); 成员变量来控制。这个好!

// if this frame will always be too large for this server, log the // error and close the connection.

if (frameSize > MAX_READ_BUFFER_BYTES) {

LOGGER.error("Read a frame size of " + frameSize

+ ", which is bigger than the maximum allowable buffer size for ALL connections.");

return false;

}

 

// if this frame will push us over the memory limit, then return.

// with luck, more memory will free up the next time around.

if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {

return true;

}

 

// increment the amount of memory allocated to read buffers

readBufferBytesAllocated.addAndGet(frameSize);

  1. 开辟一个framesize大小的bytebuffer,改变状态。

// reallocate the readbuffer as a frame-sized buffer

buffer_ = ByteBuffer.allocate(frameSize);

 

state_ = READING_FRAME;

 

  1. 读取frame
  1. 如果完全读取事件流,则调用requestInvoke(buffer)
    1. 调用THsHaServerrequestInvoke(buffer)方法,在线程池的调用句柄中执行frameBuffer.invoke()方法。此时算是一个异步调用的过程。

public void run() {

frameBuffer.invoke();

}

  1. frameBuffer.invoke()

    到此我们可以理解,我们已经完全的接收了用户的输入信息,下一步就是如何解析输入信息以及如何来处理反馈这个请求信息。

    invoke代码:

TTransport inTrans = getInputTransport();

TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);

TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());

processorFactory_.getProcessor(inTrans).process(inProt, outProt);

 

getInputTransport():

private TTransport getInputTransport() {

return new TMemoryInputTransport(buffer_.array());

}

 

将用户的输入信息交给TMemoryInputTransport 来处理

 

这句clojure代码:

(.protocolFactory

(TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))

 

TServer中有这样的代码:

public T protocolFactory(TProtocolFactory factory) {

this.inputProtocolFactory = factory;

this.outputProtocolFactory = factory;

return (T) this;

}

 

也就是inputProtocolFactory_ TBinaryProtocol$Factory的一个实例,

inputProtocolFactory_.getProtocol(inTrans);返回TBinaryProtocol一个实例。

processorFactory_.getProcessor(inTrans) 对应的clojure代码其实就

是:

(.processor (Nimbus$Processor. service-handler))

 

我们再来看看Nimbus$Processor

public static class Processor<I extends Iface>

extends org.apache.thrift.TBaseProcessor implements org.apache.thrift.TProcessor

 

 

调用TProcessprocess(inProt, outProt)方法其实调用的是Nimbus.Processor的父类TBaseProcessor的方法:

 

TBaseProcessor类中,保存了一个请求与处理业务逻辑的映射关系:

Map<String,ProcessFunction<I, ? extends TBase>> processMap;

由构造方法中注入这种对应的关系,也就是action –>业务逻辑,这里由用户实现。

 

TBaseProcessor方法中,首先读取action,根据action查找出对应的业务逻辑 ProcessFunction

TMessage msg = in.readMessageBegin();

ProcessFunction fn = processMap.get(msg.name);

 

由上文可知,这里的in就是指TBinaryProtocol实例。

如果存在对应的业务逻辑处理器,那么调用ProcessFunction的:

fn.process(msg.seqid, in, out, iface);

 

我们在回过头来观察Nimbus.Processor,看他究竟注入了多少个业务逻辑.

processMap.put("submitTopology", new submitTopology());

我们就以此为例。

submitTopology类继承了ProcessFunction,调用submitTopology类的process方法其实是调用了ProcessFunctionprocess方法。

 

ProcessFunction中,提供两个抽象交给用户实现,一是如何根据请求信息形成一个请求对象,二是如何处理这个请求细节以及以及返回什么样的结果,这个是依据项目的不同而不同(细节),但都有类似的行为:

请求返回什么样的对象,T extends TBase

T args = getEmptyArgsInstance();

如何根据请求初始化对象:

args.read(iprot);

处理请求,以及返回什么样的结果:

TBase result = getResult(iface, args);

 

 

回到这里,这里的iface究竟是什么含义呢?我们看看clojure代码:

Nimbus$Processor. service-handler

 

Nimbus.Processor的构造函数 I extends Iface

public Processor(I iface) {

super(iface,…);

}

这里的IfaceNimbus类中定义的一个接口,里面包含了一些处理业务逻辑的细节。

service-handler实现了Nimbus.Iface接口。

 

由此我们完成了处理请求的一个完整的过程,下面是TBinaryProtocol字节码格式的定义。

TBinaryProtocol的输入是一个buffer字节数组。

TBinaryProtocol 传输字节码格式,当不用匹配TBinaryProtocol的版本时:

 

4 byte

4 byte

Msg

1 byte

4 byte

1byte

2byte

4byte

msg

整个frame的大小

开始的四个字节表示后面msg的大小,msg表示action的名字,用于事件分发操作,后面的一个字节表示msg的类型,最后4个字节表示seqid,也就是msg的编号,message begin

这个区域表示用户输入的域,也就是参数,第一个字节表示域的类型,随后2个字节表示域的含义,由用户定义,4个字节表示msg的大小,最后是msg内容实体。

 

当需要匹配TBinaryProtocol的版本时,此时message begin变成:

 

4byte

4byte

msg

4byte

开始的四个字节表示VERSION_1 | message.type的值,其中messge.typebyte类型,随后的四个字节表示后面msg的大小,msg表示action的名字,用于事件分发操作,最后4个字节表示seqid,也就是msg的编号。

 

 

msg的类型有17中,定义在TType的类中。

 

那么,如何反馈一个请求呢?

由以上可知,当处理完业务逻辑时,我们得到TBase实例的反馈结果。

 

回头看FrameBuffer调用invoke()方法的这句:

TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());

 

getOutputTransport()

 

 

private TTransport getOutputTransport() {

response_ = new TByteArrayOutputStream();

// TBinaryProtocol protocol;

return outputTransportFactory_.getTransport(

new TIOStreamTransport(response_));

}

 

outputTransportFactory_对象中,是由TNonblockingServer 的类

AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {

super(transport);

transportFactory(new TFramedTransport.Factory());

}

 

因此getOutputTransport() 返回的是TFramedTransport这个对象,这个对象封装了TIOStreamTransport (ByteArrayOutputStream)这个实例。

 

因此TBinaryProtocol的输出流(response)是一个TFramedTransport这个对象。

ProcessFunctionprocess方法中,我们调用

oprot.getTransport().flush();

输出时候,此时调用TFramedTransportflush方法:

@Override

public void flush() throws TTransportException {

byte[] buf = writeBuffer_.get();

int len = writeBuffer_.len();

writeBuffer_.reset();

 

encodeFrameSize(len, i32buf);

transport_.write(i32buf, 0, 4);

transport_.write(buf, 0, len);

transport_.flush();

}

frame大小信息以及信息实体写入到ByteArrayOutputStream当中。此时我们得到了结果字节流,但没有将这些信息写入到客户端中。

此时,要做的事情是将FrameBufferkey变成可写的。于是有了write事件。

  1. 当有一个write事件时。

    调用SocketChannelwrite方法将数据写回到客户端。

    trans_.write(buffer_)

     

trans_是怎样来的呢?

 

FrameBuffer的构造方法:

public FrameBuffer( final TNonblockingTransport trans,

final SelectionKey selectionKey) {

trans_ = trans;

selectionKey_ = selectionKey;

buffer_ = ByteBuffer.allocate(4);

}

 

也就是当selectThread_ handleAccept时,

client = (TNonblockingTransport)serverTransport.accept();

 

serverTransport TNonblockingServerSocket的一个实例,调用accept方法,最终会落在TNonblockingServerSocketacceptImp()方法中,

在其父类TServerTransport accept方法里:

TTransport transport = acceptImpl();

acceptImp()方法中

SocketChannel socketChannel = serverSocketChannel.accept();

TNonblockingSocket tsocket =

new TNonblockingSocket(socketChannel);

返回的是TNonblockingSocket 这个TTransport类的子类。

 

技术分享

Thrift源码分析阅读(一)

标签:

原文地址:http://www.cnblogs.com/dzhchiyu/p/5268144.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!