码迷,mamicode.com
首页 > 其他好文 > 详细

Storm-源码分析- Thrift的使用

时间:2015-05-08 18:34:32      阅读:180      评论:0      收藏:0      [点我收藏+]

标签:

1 IDL

首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service 
然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码

比如对于nimbus service 
在IDL的定义为,

service Nimbus {

  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: stringjsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2:InvalidTopologyException ite);

  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3:string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyExceptionite);

  void killTopology(1: string name) throws (1: NotAliveException e);

  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);

  void activate(1: string name) throws (1: NotAliveException e);

  void deactivate(1: string name) throws (1: NotAliveException e);

  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyExceptionite);

  // need to add functions for asking aboutstatus of storms, what nodes they‘re running on, looking at task logs

string beginFileUpload();

  void uploadChunk(1: string location, 2: binary chunk);

  void finishFileUpload(1: string location);

  

string beginFileDownload(1: string file);

  //can stop downloading chunks when receive0-length byte array back

binary downloadChunk(1: string id);

  // returns json

string getNimbusConf();

  // stats functions

ClusterSummary getClusterInfo();

TopologyInfo getTopologyInfo(1: string id) throws (1:NotAliveException e);

  //returns json

string getTopologyConf(1: string id) throws (1:NotAliveException e);

  StormTopologygetTopology(1: string id) throws (1: NotAliveException e);

StormTopology getUserTopology(1: string id) throws (1:NotAliveException e);

}

而对应在Nimbus.java的Java代码如下,

public class Nimbus {

  public interface Iface {

    public void submitTopology(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException,org.apache.thrift7.TException;

    public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException;

    public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void killTopologyWithOpts(String name,KillOptions options) throws NotAliveException,org.apache.thrift7.TException;

    public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void rebalance(String name, RebalanceOptionsoptions) throws NotAliveException, InvalidTopologyException,org.apache.thrift7.TException;

    public String beginFileUpload() throwsorg.apache.thrift7.TException;

    public void uploadChunk(String location, ByteBufferchunk) throws org.apache.thrift7.TException;

    public void finishFileUpload(String location) throws org.apache.thrift7.TException;

    public String beginFileDownload(String file) throws org.apache.thrift7.TException;

    public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;

    public String getNimbusConf() throwsorg.apache.thrift7.TException;

    public ClusterSummary getClusterInfo() throwsorg.apache.thrift7.TException;

    public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;

    public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

  }

2 Client

1. 首先Get Client,

NimbusClient client =NimbusClient.getConfiguredClient(conf);

看看backtype.storm.utils下面的client.getConfiguredClient的逻辑, 
只是从配置中取出nimbus的host:port, 并new NimbusClient

    public static NimbusClient getConfiguredClient(Map conf) {

       try {

           String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);

           int nimbusPort =Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));

           return new NimbusClient(conf, nimbusHost, nimbusPort);

       } catch (TTransportException ex) {

           throw new RuntimeException(ex);

       }

    }

NimbusClient 继承自ThriftClient, public class NimbusClient extends ThriftClient 
ThriftClient又做了什么? 关键是怎么进行数据序列化和怎么将数据传输到remote 
这里看出Thrift对Transport和Protocol的封装 
对于Transport, 其实就是对Socket的封装, 使用TSocket(host, port) 
然后对于protocol, 默认使用TBinaryProtocol, 如果你不指定的话

    public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {

       try {

           //locate loginconfiguration

           Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);

           //construct atransport plugin

           ITransportPlugin  transportPlugin= AuthUtils.GetTransportPlugin(storm_conf, login_conf);

           //create a socketwith server

           if(host==null) {

                throw new IllegalArgumentException("host is not set");

           }

           if(port<=0) {

                throw new IllegalArgumentException("invalid port: "+port);

           }            

           TSocket socket = new TSocket(host, port);

           if(timeout!=null) {

                socket.setTimeout(timeout);

           }

           final TTransport underlyingTransport = socket;

           //establishclient-server transport via plugin

           _transport = transportPlugin.connect(underlyingTransport, host);

       } catch (IOException ex) {

           throw new RuntimeException(ex);

       }

       _protocol = null;

        if (_transport != null)

           _protocol = new TBinaryProtocol(_transport);

    }

2. 调用任意RPC 
那么就看看submitTopologyWithOpts

client.getClient().submitTopologyWithOpts(name,submittedJar, serConf, topology, opts);

可以看出上面的Nimbus的interface里面有这个方法的定义, 而且Thrift不仅仅自动产生java interface, 而且还提供整个RPC client端的实现

    public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException

    {

     send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf,topology, options);

     recv_submitTopologyWithOpts();

    }

分两步, 
首先send_submitTopologyWithOpts, 调用sendBase 
接着, recv_submitTopologyWithOpts, 调用receiveBase

  protected void sendBase(String methodName, TBase args) throws TException {

   oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL,++seqid_));

   args.write(oprot_);

   oprot_.writeMessageEnd();

   oprot_.getTransport().flush();

  }

  protected void receiveBase(TBase result, String methodName)throws TException {

   TMessage msg = iprot_.readMessageBegin();

    if (msg.type == TMessageType.EXCEPTION) {

     TApplicationException x = TApplicationException.read(iprot_);

      iprot_.readMessageEnd();

      throw x;

    }

    if (msg.seqid != seqid_) {

      throw newTApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName +" failed: out ofsequence response");

    }

   result.read(iprot_);

   iprot_.readMessageEnd();

  }

可以看出Thriftprotocol的封装, 不需要自己处理序列化, 调用protocol的接口搞定 

3 Server

Thrift强大的地方是, 实现了整个协议栈而不光只是IDL的转化, 对于server也给出多种实现 
下面看看在nimbus server端, 是用clojure来写的 
可见其中使用Thrift封装的NonblockingServerSocket, THsHaServer,TBinaryProtocol, Proccessor, 非常简单 
其中processor会使用service-handle来处理recv到的数据, 所以作为使用者只需要在service-handle中实现Nimbus$Iface, 其他和server相关的, Thrift都已经帮你封装好了, 这里使用的IDL也在backtype.storm.generated, 因为clojure基于JVM所以IDL只需要转化成Java即可.

(defn launch-server! [conf nimbus]

(validate-distributed-mode! conf)

  (let[service-handler (service-handler conf nimbus)

       options (-> (TNonblockingServerSocket. (int (confNIMBUS-THRIFT-PORT)))

                    (THsHaServer$Args.)

                    (.workerThreads 64)

                    (.protocolFactory (TBinaryProtocol$Factory.))

                    (.processor(Nimbus$Processor. service-handler))

                    )

    (.addShutdownHook(Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stopserver))))

   (log-message "StartingNimbus server...")

   (.serve server)))

Storm-源码分析- Thrift的使用

标签:

原文地址:http://my.oschina.net/crxy/blog/412356

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!