Storm-源码分析- Thrift的使用



首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service 
然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码

比如对于nimbus service 

service Nimbus {
  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void killTopology(1: string name) throws (1: NotAliveException e);
  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
  void activate(1: string name) throws (1: NotAliveException e);
  void deactivate(1: string name) throws (1: NotAliveException e);
  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);

  // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs
  string beginFileUpload();
  void uploadChunk(1: string location, 2: binary chunk);
  void finishFileUpload(1: string location);
  string beginFileDownload(1: string file);
  //can stop downloading chunks when receive 0-length byte array back
  binary downloadChunk(1: string id);

  // returns json
  string getNimbusConf();
  // stats functions
  ClusterSummary getClusterInfo();
  TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
  //returns json
  string getTopologyConf(1: string id) throws (1: NotAliveException e);
  StormTopology getTopology(1: string id) throws (1: NotAliveException e);
  StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);


public class Nimbus {

  public interface Iface {

    public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;

    public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;

    public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException, org.apache.thrift7.TException;

    public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void rebalance(String name, RebalanceOptions options) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException;

    public String beginFileUpload() throws org.apache.thrift7.TException;

    public void uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift7.TException;

    public void finishFileUpload(String location) throws org.apache.thrift7.TException;

    public String beginFileDownload(String file) throws org.apache.thrift7.TException;

    public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;

    public String getNimbusConf() throws org.apache.thrift7.TException;

    public ClusterSummary getClusterInfo() throws org.apache.thrift7.TException;

    public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;

    public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;



2 Client

1. 首先Get Client,

NimbusClient client = NimbusClient.getConfiguredClient(conf);

只是从配置中取出nimbus的host:port, 并new NimbusClient

    public static NimbusClient getConfiguredClient(Map conf) {
        try {
            String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
            int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
            return new NimbusClient(conf, nimbusHost, nimbusPort);
        } catch (TTransportException ex) {
            throw new RuntimeException(ex);

NimbusClient 继承自ThriftClient, public class NimbusClient extends ThriftClient 
ThriftClient又做了什么? 关键是怎么进行数据序列化和怎么将数据传输到remote 
对于Transport, 其实就是对Socket的封装, 使用TSocket(host, port) 
然后对于protocol, 默认使用TBinaryProtocol, 如果你不指定的话

    public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {
        try {
            //locate login configuration 
            Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);

            //construct a transport plugin
            ITransportPlugin  transportPlugin = AuthUtils.GetTransportPlugin(storm_conf, login_conf);

            //create a socket with server
            if(host==null) {
                throw new IllegalArgumentException("host is not set");
            if(port<=0) {
                throw new IllegalArgumentException("invalid port: "+port);
            TSocket socket = new TSocket(host, port);
            if(timeout!=null) {
            final TTransport underlyingTransport = socket;

            //establish client-server transport via plugin
            _transport =  transportPlugin.connect(underlyingTransport, host); 
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        _protocol = null;
        if (_transport != null)
            _protocol = new  TBinaryProtocol(_transport);


2. 调用任意RPC 

client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); 

可以看出上面的Nimbus的interface里面有这个方法的定义, 而且Thrift不仅仅自动产生java interface, 而且还提供整个RPC client端的实现

    public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException
      send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
首先send_submitTopologyWithOpts, 调用sendBase 
接着, recv_submitTopologyWithOpts, 调用receiveBase 
  protected void sendBase(String methodName, TBase args) throws TException {
    oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));

  protected void receiveBase(TBase result, String methodName) throws TException {
    TMessage msg = iprot_.readMessageBegin();
    if (msg.type == TMessageType.EXCEPTION) {
      TApplicationException x =;
      throw x;
    if (msg.seqid != seqid_) {
      throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
可以看出Thrift对protocol的封装, 不需要自己处理序列化, 调用protocol的接口搞定 


3 Server

Thrift强大的地方是, 实现了整个协议栈而不光只是IDL的转化, 对于server也给出多种实现 
下面看看在nimbus server端, 是用clojure来写的 
可见其中使用Thrift封装的NonblockingServerSocket, THsHaServer, TBinaryProtocol, Proccessor, 非常简单 
其中processor会使用service-handle来处理recv到的数据, 所以作为使用者只需要在service-handle中实现Nimbus$Iface, 其他和server相关的, Thrift都已经帮你封装好了, 这里使用的IDL也在backtype.storm.generated, 因为clojure基于JVM所以IDL只需要转化成Java即可.

(defn launch-server! [conf nimbus]
  (validate-distributed-mode! conf)
  (let [service-handler (service-handler conf nimbus)
        options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
                    (.workerThreads 64)
                    (.protocolFactory (TBinaryProtocol$Factory.))
                    (.processor (Nimbus$Processor. service-handler))
       server (THsHaServer. options)]
    (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
    (log-message "Starting Nimbus server...")
    (.serve server)))

消息中间件 存储 Java
Apache Thrift在分布式程序中的应用
Apache Thrift在分布式程序中的应用
Apache Thrift在分布式程序中的应用
JSON 网络协议 Java
thrift 原理浅析
thrift 原理浅析
390 0
thrift 原理浅析
存储 分布式计算 Hadoop
客户端源码分析 启动的客户端代码 public static void main(String[] args) throws Exception { // 创建配置文件对象 Configuration conf = new Configuration(true); // 获取Job对象 Job job = Job.getInstance(conf); // 设置相关类 job.setJarByClass(WcTest.class);
Java 分布式数据库 调度
HBase rpc框架介绍
1385 0
HBase rpc框架介绍
Java 流计算
Flink WindowOperator 源码分析
0x1 摘要 WindowOperator可以说是Flink窗口功能非常核心核心的类,是窗口功能源码的一条主线,延着这条主线去慢慢看源码会轻松很多。注:此文基于Flink 1.4.2 版本源码。 0x2 WindowOperator 类结构分析 先来看一下类结构图,可以使用idea来生成类图,下图.
1813 0

