码迷,mamicode.com
首页 > 其他好文 > 详细

Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime

时间:2016-08-27 19:27:26      阅读:986      评论:0      收藏:0      [点我收藏+]

标签:

转载请注明出处:Import博客园http://www.cnblogs.com/thinkpad

注意:本文批处理只是Storm到Hbase批处理入库操作,并非Storm的API的批处理!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

  1 package com.storm.hbaseTest;
  2 
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Iterator;
  6 import java.util.List;
  7 import java.util.Map;
  8 
  9 import org.apache.commons.lang.StringUtils;
 10 import org.apache.hadoop.conf.Configuration;
 11 import org.apache.hadoop.hbase.HBaseConfiguration;
 12 import org.apache.hadoop.hbase.client.HConnection;
 13 import org.apache.hadoop.hbase.client.HConnectionManager;
 14 import org.apache.hadoop.hbase.client.HTableInterface;
 15 import org.apache.hadoop.hbase.client.Put;
 16 
 17 import backtype.storm.Config;
 18 import backtype.storm.Constants;
 19 import backtype.storm.task.OutputCollector;
 20 import backtype.storm.task.TopologyContext;
 21 import backtype.storm.topology.BasicOutputCollector;
 22 import backtype.storm.topology.IRichBolt;
 23 import backtype.storm.topology.OutputFieldsDeclarer;
 24 import backtype.storm.topology.base.BaseBasicBolt;
 25 import backtype.storm.tuple.Fields;
 26 import backtype.storm.tuple.Tuple;
 27 
 28 import org.slf4j.Logger;
 29 import org.slf4j.LoggerFactory;
 30 
 31 import com.google.common.collect.Lists;
 32 
 33 /**
 34  * @ClassName: HbaseBout.java
 35  * @Description:自定义批入库,定时批量和大小批量
 36  * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
 37  * @date 2016年8月27日 下午4:43:19 
 38  * @version V1.0
 39  */
 40 @SuppressWarnings("all")
 41 public class HbaseBout implements IRichBolt{
 42     
 43       private static final long serialVersionUID = 1L;
 44       private static final Logger LOG = LoggerFactory.getLogger(HbaseBout.class);
 45       
 46       protected OutputCollector collector;
 47       protected HbaseClient hbaseClient;
 48       protected String tableName;
 49       protected String configKey ="hbase.conf";
 50       
 51       //批处理大小
 52       protected int batchSize = 15000;
 53       List<Put> batchMutations;
 54       List<Tuple> tupleBatch;
 55       //tick Time
 56       int flushIntervalSecs = 1;
 57       
 58   /**
 59    * @Description:Storm初始化
 60    * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
 61    */
 62     public void prepare(Map map, TopologyContext context,
 63             OutputCollector collector) {
 64         
 65         this.collector = collector;
 66         Configuration hbConfig = HBaseConfiguration.create();
 67         Map<String,String> conf = (Map)map.get(this.configKey);
 68         
 69         //获取Hbase配置
 70         if (conf == null) {
 71           throw new IllegalArgumentException("HBase configuration not found using key ‘" + this.configKey + "‘");
 72         }
 73         
 74         //批大小
 75         if(map.get("batchSize") != null){
 76             this.batchSize = new Integer(map.get("batchSize").toString());
 77         }
 78         //Tick Time
 79         if(map.get("flushIntervalSecs") != null){
 80             this.flushIntervalSecs = Integer.valueOf(map.get("flushIntervalSecs").toString());
 81         }
 82         
 83         //Hbase 配置
 84         for (String key : conf.keySet()) {
 85           hbConfig.set(key, String.valueOf(conf.get(key)));
 86         }
 87         this.hbaseClient = new HbaseClient(hbConfig, this.tableName);
 88     }
 89 
 90   /**
 91    * @Description:每次调用
 92    * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
 93    */
 94     @Override
 95     public void execute(Tuple tuple) {
 96         
 97          boolean flush = false;
 98          
 99          try {
100              //Tick
101              if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
102                 LOG.debug("TICK received! current batch status ", Integer.valueOf(this.tupleBatch.size()), Integer.valueOf(this.batchSize)); 
103                 flush = true;
104             }else{
105                 
106                 Put put = new Put(tuple.getStringByField("rowKey").getBytes());
107                 put.add("cf".getBytes(), "name".getBytes(), tuple.getStringByField("name").getBytes());
108                 put.add("cf".getBytes(), "sex".getBytes(), tuple.getStringByField("sex").getBytes());
109                 this.batchMutations.add(put);
110                 this.tupleBatch.add(tuple);
111                 
112                 //当前tuple批大小
113                 if (this.tupleBatch.size() >= this.batchSize) {
114                   flush = true;
115                 }
116             }
117              //持久化操作
118              if ((flush) && (!this.tupleBatch.isEmpty())) {
119                   this.hbaseClient.batchMutate(this.batchMutations);
120                   LOG.debug("acknowledging tuples after batchMutate");
121                   for (Iterator<Tuple> tuples = this.tupleBatch.iterator(); tuples.hasNext(); ) { 
122                       Tuple t = (Tuple)tuples.next();
123                         this.collector.ack(t);
124                   }
125                   this.tupleBatch.clear();
126                   this.batchMutations.clear();
127                }
128         } catch (Exception e) {
129               LOG.debug("inser batch fail");
130               this.collector.reportError(e);
131               for (Tuple t : this.tupleBatch) {
132                   this.collector.fail(t);
133                 }
134               this.tupleBatch.clear();
135               this.batchMutations.clear();
136         }
137     }
138 
139   /**
140    * @Description:字段声明
141    * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
142    */
143     public void declareOutputFields(OutputFieldsDeclarer declarer) {
144         
145     }
146     
147     /**
148        * @Description:配置
149        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
150      */
151     @Override
152     public Map<String, Object> getComponentConfiguration() {
153         Config conf = new Config();
154         conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,flushIntervalSecs);
155         return conf;
156     }
157 
158     
159     /**
160        * @Description:清理方法
161        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
162       */
163     @Override
164     public void cleanup() {
165         
166     }
167 }
168 
169 /**
170  * 
171  * @ClassName: HbaseBout.java
172  * @Description:Hbase操作相关类
173  * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
174  * @date 2016年8月27日 下午6:06:34 
175  * @version V1.0
176  */
177 class HbaseClient{
178      
179      private static final Logger LOG = LoggerFactory.getLogger(HbaseClient.class);
180       HConnection hTablePool = null;
181       HTableInterface table = null;
182       
183       /**
184        * @Description:获取连接
185        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
186        * @param configuration:Hbase表配置
187        * @param tableName:表名
188        */
189       public HbaseClient(final Configuration configuration, final String tableName)
190       {
191         try
192         {
193             hTablePool =  HConnectionManager.createConnection(configuration) ;
194             this.table =  hTablePool.getTable(tableName);
195         }
196         catch (Exception e) {
197           throw new RuntimeException("HBase create failed: " + e.getMessage(), e);
198         }
199       }
200 
201       /**
202        * 
203        * @ClassName: HbaseBout.java
204        * @Description:批入库
205        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
206        * @date 2016年8月27日 下午6:05:39 
207        * @version V1.0
208        */
209       public void batchMutate(List<Put> puts) throws Exception {
210           
211           try {
212             this.table.put(puts);
213         } catch (Exception e) {
214               LOG.warn("Error insert batch to HBase.", e);
215               throw e;
216         }
217       }
218 }

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         

Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5813535.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!