码迷,mamicode.com
首页 > 其他好文 > 详细

Strom之WordCount

时间:2015-07-31 21:50:23      阅读:141      评论:0      收藏:0      [点我收藏+]

标签:

新建Maven项目

  <dependencies>
   <dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-core</artifactId>
   <version>0.9.2-incubating</version>
   </dependency>
  </dependencies>

 

JavaCode

  1 package mystorm;
  2 
  3 import java.io.File;
  4 import java.io.IOException;
  5 import java.util.HashMap;
  6 import java.util.List;
  7 import java.util.Map;
  8 import java.util.Map.Entry;
  9 
 10 import org.apache.commons.io.FileUtils;
 11 
 12 import clojure.main;
 13 import clojure.lang.MapEntry;
 14 
 15 import backtype.storm.LocalCluster;
 16 import backtype.storm.StormSubmitter;
 17 import backtype.storm.generated.AlreadyAliveException;
 18 import backtype.storm.generated.InvalidTopologyException;
 19 import backtype.storm.spout.SpoutOutputCollector;
 20 import backtype.storm.task.OutputCollector;
 21 import backtype.storm.task.TopologyContext;
 22 import backtype.storm.topology.OutputFieldsDeclarer;
 23 import backtype.storm.topology.TopologyBuilder;
 24 import backtype.storm.topology.base.BaseRichBolt;
 25 import backtype.storm.topology.base.BaseRichSpout;
 26 import backtype.storm.tuple.Fields;
 27 import backtype.storm.tuple.Tuple;
 28 import backtype.storm.tuple.Values;
 29 
 30 //本地模式
 31 public class WordCountApp {
 32     
 33     public static void main(String[] args) throws Exception {
 34         final TopologyBuilder topologyBuilder = new TopologyBuilder();
 35 
 36         topologyBuilder.setSpout("1", new MySpout());
 37         topologyBuilder.setBolt("2", new SplitLineBolt()).shuffleGrouping("1");
 38         topologyBuilder.setBolt("3", new WordCountBolt()).shuffleGrouping("2");
 39         
 40         final HashMap conf = new HashMap();
 41         
 42         final StormSubmitter stormSubmitter = new StormSubmitter();
 43         stormSubmitter.submitTopology(WordCountApp.class.getSimpleName(), conf, topologyBuilder.createTopology());
 44     }
 45 }
 46 
 47 class MySpout extends BaseRichSpout{
 48     private static final long serialVersionUID = 1L;
 49 
 50     SpoutOutputCollector collector = null;
 51 
 52     public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
 53         this.collector = collector;
 54     }
 55     
 56     //最最重要的方法,处理数据的。简单认为是死循环的,监听文件内容的变化
 57     public void nextTuple() {
 58         try {
 59             final List<String> readLines = FileUtils.readLines(new File("/root/Downloads/hello"));
 60             for (String line : readLines) {
 61                 //把每一行看作一个tuple
 62                 final Values tuple = new Values(line);
 63                 //collector把tuple送出去,交给bolt处理
 64                 collector.emit(tuple);
 65             }
 66             Thread.sleep(2000L);
 67         } catch (Exception e) {
 68             e.printStackTrace();
 69         }
 70     }
 71 
 72 
 73     public void declareOutputFields(OutputFieldsDeclarer declarer) {
 74         final Fields fields = new Fields("line");
 75         declarer.declare(fields);
 76     }
 77     
 78 }
 79 
 80 class SplitLineBolt extends BaseRichBolt{
 81 
 82     OutputCollector collector = null;
 83     
 84     public void execute(Tuple tuple) {
 85         final String line = tuple.getString(0);
 86         final String[] splited = line.split("\t");
 87         for (String word : splited) {
 88             this.collector.emit(new Values(word));
 89         }
 90         try {
 91             Thread.sleep(2000L);
 92         } catch (InterruptedException e) {
 93             e.printStackTrace();
 94         }
 95     }
 96 
 97     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
 98         this.collector = collector;
 99     }
100 
101     public void declareOutputFields(OutputFieldsDeclarer declarer) {
102         declarer.declare(new Fields("word"));
103     }
104 }
105 
106 
107 class WordCountBolt extends BaseRichBolt{
108 
109     OutputCollector collector = null;
110     
111     Map<String,Integer> map = new HashMap<String,Integer>();
112     
113     public void execute(Tuple tuple) {
114         final String word = tuple.getString(0);
115         final Integer value = map.get(word);
116         if(value==null) {
117             map.put(word, 1);
118         }else {
119             map.put(word, value+1);
120         }
121         
122         try {
123             Thread.sleep(2000L);
124         } catch (InterruptedException e) {
125             e.printStackTrace();
126         }
127     }
128     
129     @Override
130     public void cleanup() {
131         for (Entry<String, Integer> entry : map.entrySet()) {
132             System.err.println(entry.getKey()+":"+entry.getValue());
133         }
134     }
135 
136     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
137         this.collector = collector;
138     }
139 
140     public void declareOutputFields(OutputFieldsDeclarer declarer) {
141         declarer.declare(new Fields("word"));
142     }
143     
144     
145 }

 

本地运行

final HashMap conf = new HashMap();
 
final LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(WordCountApp.class.getSimpleName(), conf, topologyBuilder.createTopology());

 

Strom之WordCount

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/4693155.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!