码迷,mamicode.com
首页 > 其他好文 > 详细

storm数据流分流和合并

时间:2014-11-23 22:59:44      阅读:4962      评论:0      收藏:0      [点我收藏+]

标签:style   blog   http   ar   color   使用   sp   数据   on   

数据流分流和合并

 

数据流经常需要分流与合并操作,如下图所示:

bubuko.com,布布扣

分流

分流有2钟情况,第一种是,相同的tuple发往下一级不同的bolt, 第二种,分别发送不同的tuple到不同的下级bolt上。

发送相同tuple

其实和普通1v1 发送一模一样,就是有2个或多个bolt接收同一个spout或bolt的数据 举例来说:

SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
                new SequenceSpout(), spoutParal);

builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
                        SequenceTopologyDef.SEQUENCE_SPOUT_NAME);

builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
                        .shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);

 

发送不同的tuple

当发送不同的tuple到不同的下级bolt时, 这个时候,就需要引入stream概念,发送方发送a 消息到接收方A‘时使用stream A, 发送b 消息到接收方B‘时,使用stream B

在topology提交时:

               builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
                        SequenceTopologyDef.SEQUENCE_SPOUT_NAME);

                builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
                        SequenceTopologyDef.SPLIT_BOLT_NAME,  // --- 发送方名字
                        SequenceTopologyDef.TRADE_STREAM_ID); // --- 接收发送方该stream 的tuple

                builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
                        .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字
                                SequenceTopologyDef.CUSTOMER_STREAM_ID);      // --- 接收发送方该stream 的tuple

 

在发送消息时:

public void execute(Tuple tuple, BasicOutputCollector collector) {
     tpsCounter.count();

     Long tupleId = tuple.getLong(0);
     Object obj = tuple.getValue(1);

     if (obj instanceof TradeCustomer) {

         TradeCustomer tradeCustomer = (TradeCustomer)obj;

         Pair trade = tradeCustomer.getTrade();
         Pair customer = tradeCustomer.getCustomer();

            collector.emit(SequenceTopologyDef.TRADE_STREAM_ID, 
                    new Values(tupleId, trade));

            collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID, 
                    new Values(tupleId, customer));
     }else if (obj != null){
         LOG.info("Unknow type " + obj.getClass().getName());
     }else {
         LOG.info("Nullpointer " );
     }

 }

 

定义输出流格式:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE"));
  declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
 }
 //接受消息时,需要判断数据流

 if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) {
            customer = pair;
            customerTuple = input;

            tradeTuple = tradeMap.get(tupleId);
            if (tradeTuple == null) {
                customerMap.put(tupleId, input);
                return;
            }

            trade = (Pair) tradeTuple.getValue(1);

        }

 

数据流合并

生成topology时

在下面例子中, MergeRecord 同时接收SequenceTopologyDef.TRADE_BOLT_NAME 和SequenceTopologyDef.CUSTOMER_BOLT_NAME 的数据

               builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
                        SequenceTopologyDef.SPLIT_BOLT_NAME, 
                        SequenceTopologyDef.TRADE_STREAM_ID);

                builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
                        .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME,
                                SequenceTopologyDef.CUSTOMER_STREAM_ID);

                builder.setBolt(SequenceTopologyDef.MERGE_BOLT_NAME, new MergeRecord(), 1)
                        .shuffleGrouping(SequenceTopologyDef.TRADE_BOLT_NAME)
                        .shuffleGrouping(SequenceTopologyDef.CUSTOMER_BOLT_NAME);

 

发送方

发送的bolt和普通一样,无需特殊处理

接收方

接收方是,区分一下来源component即可识别出数据的来源

        if (input.getSourceComponent().equals(SequenceTopologyDef.CUSTOMER_BOLT_NAME) ) {
            customer = pair;
            customerTuple = input;

            tradeTuple = tradeMap.get(tupleId);
            if (tradeTuple == null) {
                customerMap.put(tupleId, input);
                return;
            }

            trade = (Pair) tradeTuple.getValue(1);

        } else if (input.getSourceComponent().equals(SequenceTopologyDef.TRADE_BOLT_NAME)) {
            trade = pair;
            tradeTuple = input;

            customerTuple = customerMap.get(tupleId);
            if (customerTuple == null) {
                tradeMap.put(tupleId, input);
                return;
            }

            customer = (Pair) customerTuple.getValue(1);
        } 

 

 

 

 

 

 


storm数据流分流和合并

标签:style   blog   http   ar   color   使用   sp   数据   on   

原文地址:http://www.cnblogs.com/muzhongjiang/p/4117625.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!