标签:
spout:
package com.storm.WordCount; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.Random; public class MyRandomSentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; SpoutOutputCollector _collector; Random _rand; String[] sentences = new String[]{ "a b c d","b c d e f","a d e f"}; @Override public void nextTuple() { for (String sen : sentences) { _collector.emit(new Values(sen)); } Utils.sleep(1000*5); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } @Override public void ack(Object id) { } @Override public void fail(Object id) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
splitBolt
package com.storm.WordCount; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.FailedException; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class MySplit extends BaseBasicBolt{ private static final long serialVersionUID = 1L; //切割符 String patton; public MySplit(String patton){ this.patton = patton; } /** * ask 被管理 ,不需要显式调 */ @Override public void execute(Tuple input, BasicOutputCollector collector) { try { String sen = input.getStringByField("word"); if (sen != null) { for (String word : sen.split(patton)) { //必须Values List类型 collector.emit(new Values(word)); } } } catch (Exception e) { throw new FailedException("split fail!"); } } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void cleanup() { } }
wordCountBolt:
package com.storm.WordCount; import java.util.HashMap; import java.util.Map; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordCount extends BaseBasicBolt { private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
sumBolt:
package com.storm.WordCount; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.FailedException; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class SumBolt implements IBasicBolt { private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple input, BasicOutputCollector collector) { try { //总数个数 Long words_um =0L; //去重总数个数 long word_count =0L; String word = input.getString(0); Integer count = input.getInteger(1); counts.put(word, count); Iterator<Integer> i = counts.values().iterator(); //获取总数,遍历counts的values,进行sum while (i.hasNext()) { words_um += i.next(); } //获取word去重的个数,遍历counts的keySet,取count Iterator<String> i2 = counts.keySet().iterator(); while (i2.hasNext()) { String oneWord =i2.next(); if(oneWord != null){ word_count ++; } } System.err.println(" 总数 = "+words_um +" 去重总数 = "+word_count); } catch (Exception e) { throw new FailedException("split fail!"); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void cleanup() { } }
Topo:
package com.storm.WordCount; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class WordCountTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new MyRandomSentenceSpout(), 1); //分割 builder.setBolt("split", new MySplit(" "), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); //全局汇总必须开单线程 builder.setBolt("sum", new SumBolt(), 1).shuffleGrouping("count"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); // Thread.sleep(10000); // // cluster.shutdown(); } } }
标签:
原文地址:http://www.cnblogs.com/thinkpad/p/5075568.html