码迷,mamicode.com
首页 > 其他好文 > 详细

Storm批处理之ITransactionalSpout普通事务Spout

时间:2015-12-26 19:38:00      阅读:644      评论:0      收藏:0      [点我收藏+]

标签:

Spout类:

技术分享
 1 package transaction1;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.Random;
 6 
 7 import backtype.storm.task.TopologyContext;
 8 import backtype.storm.topology.OutputFieldsDeclarer;
 9 import backtype.storm.transactional.ITransactionalSpout;
10 import backtype.storm.tuple.Fields;
11 
12 public class MyTxSpout implements ITransactionalSpout<MyMata> {
13 
14     private static final long serialVersionUID = 1L;
15 
16     /**
17      * 数据源
18      */
19     Map<Long, String>  dbMap = null;
20     
21     public MyTxSpout(){
22         
23         dbMap = new HashMap<Long, String>();
24         Random random = new Random();
25 
26         String[] hosts = { "www.taobao.com" };
27         String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
28                 "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
29         
30         String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 
31                 "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
32         
33         for (long i = 0; i < 100; i++) {
34             dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
35         }
36     }
37 
38     /**
39      * 为事务性batch发射tuple,coordinator只有一个,
40      * 进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到”batch emit”流
41      * 
42      */
43     @Override
44     public backtype.storm.transactional.ITransactionalSpout.Coordinator<MyMata> getCoordinator(
45             Map conf, TopologyContext context) {
46         return new MyCoordinator();
47     }
48     
49     /**
50      * 负责为每个batch实际发射tuple,emitter根据并行度可以有多个实例
51      * 
52      */
53     @Override
54     public backtype.storm.transactional.ITransactionalSpout.Emitter<MyMata> getEmitter(
55             Map conf, TopologyContext context) {
56         return new MyEmitter(dbMap);
57     }
58 
59     
60     @Override
61     public void declareOutputFields(OutputFieldsDeclarer declarer) {
62         declarer.declare(new Fields("tx","log"));
63     }
64 
65     @Override
66     public Map<String, Object> getComponentConfiguration() {
67         return null;
68     }
69 }
MyTxSpout

Coordinator: 为事务性batch发射tuple,进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到”batch emit”流,coordinator只有一个,

技术分享
 1 package transaction1;
 2 
 3 import java.math.BigInteger;
 4 
 5 import backtype.storm.transactional.ITransactionalSpout;
 6 import backtype.storm.utils.Utils;
 7 
 8 public class MyCoordinator implements ITransactionalSpout.Coordinator<MyMata>{
 9 
10     //batch中tuple的个数
11     private static int BATCH_NUM = 10;
12     
13     /**
14      * 启动一个事务,生产元数据,定义事务开始的位置和数量
15      * @param  txid 事务id,默认从0开始
16      * @param  prevMetadata 上一个元数据
17      * 
18      */
19     @Override
20     public MyMata initializeTransaction(BigInteger txid, MyMata prevMetadata) {
21         
22         long beginPoint = 0;
23         
24         if (prevMetadata == null) {
25             //第一个事务,程序刚开始
26             beginPoint = 0;
27             
28         }else{
29 
30             beginPoint = prevMetadata.getBeginPoint() + prevMetadata.getNum();
31         }
32         
33         MyMata myMata = new MyMata();
34         myMata.setBeginPoint(beginPoint);
35         myMata.setNum(BATCH_NUM); 
36         
37         System.err.println("启动一个事务: "+myMata.toString());
38         
39         return myMata;
40     }
41 
42     
43     /**
44      * 只有返回为true,开启一个事务进入processing阶段,发射一个事务性的tuple到batch emit流,Emitter以广播方式订阅Coordinator的batch emit流
45      */
46     @Override
47     public boolean isReady() {
48         
49         Utils.sleep(2000);
50         return true;
51     }
52 
53     @Override
54     public void close() {
55         
56     }
57 }
MyCoordinator

MyEmitter:负责为每个batch实际发射tuple,emitter根据并行度可以有多个实例

技术分享
package transaction1;

import java.math.BigInteger;
import java.util.Map;

import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.transactional.ITransactionalSpout;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.tuple.Values;

public class MyEmitter implements ITransactionalSpout.Emitter<MyMata>{
    
    
    Map<Long, String>  dbMap = null;

    public MyEmitter(){}
    
    public MyEmitter(Map<Long, String> dbMap) {
        
        this.dbMap = dbMap;
    }


    /**
     * 接收Coordinator事务tuple后,会进行batch tuple的发射,逐个发射batch的tuple
     *   
     * @param tx 
     *         必须以TransactionAttempt作为第一个field,含两个值:一个transaction id,一个attempt id。
     *         transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的
     *         ,而且不管这个batch    replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,
     *         它replay之后的attempt id跟replay之前就不一样了,
     */
    @Override
    public void emitBatch(TransactionAttempt tx, MyMata coordinatorMeta,
            BatchOutputCollector collector) {
        
        long beginPoint = coordinatorMeta.getBeginPoint();
        long num = coordinatorMeta.getNum();
        
        //每次发送数据源的量的多少,根据元数据决定
        for (long i = beginPoint; i < num+beginPoint; i++) {
            
            if (dbMap.get(i)==null) {
                continue;
            }
            collector.emit(new Values(tx,dbMap.get(i)));
        }
    }

    //清理之前事务的信息
    @Override
    public void cleanupBefore(BigInteger txid) {
        
        
    }

    @Override
    public void close() {
        
    }

}
MyEmitter

 

元数据:

技术分享
 1 package transaction1;
 2 
 3 import java.io.Serializable;
 4 
 5 /**
 6  * 定义元数据
 7  */
 8 public class MyMata  implements Serializable{
 9 
10     private static final long serialVersionUID = 1L;
11     
12     private Long beginPoint ;//事务开始位置
13     
14     private int  num; //batch 的tuple个数
15 
16     public Long getBeginPoint() {
17         return beginPoint;
18     }
19 
20     public void setBeginPoint(Long beginPoint) {
21         this.beginPoint = beginPoint;
22     }
23 
24     public int getNum() {
25         return num;
26     }
27 
28     public void setNum(int num) {
29         this.num = num;
30     }
31 
32     @Override
33     public String toString() {
34         return getBeginPoint()+"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+getNum();
35     }
36     
37     
38 }
View Code

 

BaseTransactionalBolt:

技术分享
 1 package transaction1;
 2 
 3 import java.util.Map;
 4 
 5 import backtype.storm.coordination.BatchOutputCollector;
 6 import backtype.storm.task.TopologyContext;
 7 import backtype.storm.topology.OutputFieldsDeclarer;
 8 import backtype.storm.topology.base.BaseTransactionalBolt;
 9 import backtype.storm.transactional.TransactionAttempt;
10 import backtype.storm.tuple.Fields;
11 import backtype.storm.tuple.Tuple;
12 import backtype.storm.tuple.Values;
13 
14 public class MyTransactionBolt extends BaseTransactionalBolt{
15 
16     private static final long serialVersionUID = 1L;
17     
18     long count = 0;
19     
20     BatchOutputCollector collector ;
21     TransactionAttempt id;
22     @Override
23     public void prepare(Map conf, TopologyContext context,
24             BatchOutputCollector collector, TransactionAttempt id) {
25         this.collector = collector;
26         this.id = id;
27         System.err.println("MyTransactionBolt   prepare :~~~~~~~~~~ TransactionId :"+id.getTransactionId() + " AttemptId : "+id.getAttemptId());
28     }
29 
30     /**
31      * 执行batch里面处理每一个tuple
32      */
33     @Override
34     public void execute(Tuple tuple) {
35         
36         TransactionAttempt tx = (TransactionAttempt)tuple.getValue(0);
37         System.err.println("MyTransactionBolt  execute ~~~~~~~~~~ TransactionId :"+tx.getTransactionId() + " AttemptId : "+tx.getAttemptId());
38         String log = tuple.getStringByField("log");
39         
40         if (log != null && log.length()> 0) {
41                 count ++;
42         }
43     }
44 
45     /**
46      * 同一个批次处理完一个批次调用
47      */
48     @Override
49     public void finishBatch() {
50         System.err.println("finishBatch "+count );
51         //继续发射
52         collector.emit(new Values(id,count));
53     }
54 
55     @Override
56     public void declareOutputFields(OutputFieldsDeclarer declarer) {
57         declarer.declare(new Fields("tx","count"));
58     }
59 }
MyTransactionBolt


MyCommiter:

技术分享
 1 package transaction1;
 2 
 3 import java.math.BigInteger;
 4 import java.util.HashMap;
 5 import java.util.Map;
 6 
 7 import backtype.storm.coordination.BatchOutputCollector;
 8 import backtype.storm.task.TopologyContext;
 9 import backtype.storm.topology.OutputFieldsDeclarer;
10 import backtype.storm.topology.base.BaseTransactionalBolt;
11 import backtype.storm.transactional.ICommitter;
12 import backtype.storm.transactional.TransactionAttempt;
13 import backtype.storm.tuple.Fields;
14 import backtype.storm.tuple.Tuple;
15 
16 /**
17  * Commiting阶段
18  *  
19  *
20  */
21 public class MyCommiter extends BaseTransactionalBolt implements ICommitter {
22     
23     public static Map<String,DBValue> dbMap = new HashMap<String, MyCommiter.DBValue>();
24 
25     private static final long serialVersionUID = 1L;
26     
27     public static final String GLOBAL_KEY = "GLOBAL_KEY";
28     
29     long sum = 0;
30     
31     TransactionAttempt id;
32     
33     BatchOutputCollector collector;
34     
35     @Override
36     public void prepare(Map conf, TopologyContext context,
37             BatchOutputCollector collector, TransactionAttempt id) {
38         this.id = id;
39         this.collector = collector;
40     }
41 
42     @Override
43     public void execute(Tuple tuple) {
44         
45         TransactionAttempt tx = (TransactionAttempt) tuple.getValue(0);
46         Long count = tuple.getLong(1);
47         sum += count;
48     }
49 
50     @Override
51     public void finishBatch() {
52     
53         //更新数据库
54         DBValue value = dbMap.get("GLOBAL_KEY");
55         DBValue newvalue ;
56         
57         //第一次写入或者写入最新的数据
58         if (value == null || !value.txid.equals(id.getTransactionId())) {
59             
60             newvalue = new DBValue();
61             newvalue.txid = id.getTransactionId();
62     
63             //第一次
64             if (value == null) {
65                 newvalue.count = sum;
66             }else{
67                 newvalue.count =value.count+sum;
68             }
69             dbMap.put(GLOBAL_KEY, newvalue);
70         }else{
71             newvalue = value;
72         }
73         
74         System.out.println("total----------------------------->"+dbMap.get(GLOBAL_KEY).count);
75         
76         
77         //发送结果到下一级   collector.emit(new Values());
78     }
79 
80     @Override
81     public void declareOutputFields(OutputFieldsDeclarer declarer) {
82         declarer.declare(new Fields(""));
83     }
84 
85      public static class DBValue{
86         
87          BigInteger txid;
88          long  count =0;
89     }
90 }
MyCommiter

 

MyToPo:

技术分享
 1 package transaction1;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.transactional.TransactionalTopologyBuilder;
 7 
 8 
 9 public class MyToPo { 
10     public static void main(String [] args) throws Exception{
11         
12         TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId", "txSpoutId", new MyTxSpout(),1);
13         builder.setBolt("txBolt",new MyTransactionBolt(),5).shuffleGrouping("txSpoutId");
14         builder.setCommitterBolt("commit", new MyCommiter(),1).shuffleGrouping("txBolt");
15         
16         //设置参数
17         Config conf = new Config();
18         
19         if (args.length > 0) {
20             //分布式提交
21             StormSubmitter.submitTopology(args[0], conf, builder.buildTopology());
22         }else{
23             //本地模式提交
24             LocalCluster localCluster = new LocalCluster();
25             localCluster.submitTopology("mytopology", conf, builder.buildTopology());
26         }
27     }
28 }
MyToPo

 

 

1 TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId", "txSpoutId", new MyTxSpout(),1);
2  builder.setBolt("txBolt",new MyTransactionBolt(),5).shuffleGrouping("txSpoutId");
3  builder.setCommitterBolt("commit", new MyCommiter(),1).shuffleGrouping("txBolt");

 

运行结果:

技术分享

 

Storm批处理之ITransactionalSpout普通事务Spout

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5078676.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!