(一)一个例子
本示例使用storm运行经典的wordcount程序,拓扑如下:
sentence-spout—>split-bolt—>count-bolt—>report-bolt
分别完成句子的产生、拆分出单词、单词数量统计、统计结果输出
完整代码请见 https://github.com/jinhong-lu/stormdemo
以下是关键代码的分析。
1、创建spout
public class SentenceSpout extends BaseRichSpout { private SpoutOutputCollector collector; private int index = 0; private String[] sentences = { "when i was young i'd listen to the radio", "waiting for my favorite songs", "when they played i'd sing along", "it make me smile", "those were such happy times and not so long ago", "how i wondered where they'd gone", "but they're back again just like a long lost friend", "all the songs i love so well", "every shalala every wo'wo", "still shines.", "every shing-a-ling-a-ling", "that they're starting", "to sing so fine"}; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } public void nextTuple() { this.collector.emit(new Values(sentences[index])); index++; if (index >= sentences.length) { index = 0; } try { Thread.sleep(1); } catch (InterruptedException e) { //e.printStackTrace(); } } }
public class SplitSentenceBolt extends BaseRichBolt{ private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public void execute(Tuple input) { String sentence = input.getStringByField("sentence"); String[] words = sentence.split(" "); for(String word : words){ this.collector.emit(new Values(word)); //System.out.println(word); } } }
public class WordCountBolt extends BaseRichBolt{ private OutputCollector collector; private Map<String,Long> counts = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counts = new HashMap<String, Long>(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","count")); } public void execute(Tuple input) { String word = input.getStringByField("word"); Long count = this.counts.get(word); if(count == null){ count = 0L; } count++; this.counts.put(word, count); this.collector.emit(new Values(word,count)); //System.out.println(count); } }
public class ReportBolt extends BaseRichBolt{ private Map<String, Long> counts; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counts = new HashMap<String,Long>(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } public void execute(Tuple input) { String word = input.getStringByField("word"); Long count = input.getLongByField("count"); counts.put(word, count); } public void cleanup() { System.out.println("Final output"); Iterator<Entry<String, Long>> iter = counts.entrySet().iterator(); while (iter.hasNext()) { Entry<String, Long> entry = iter.next(); String word = (String) entry.getKey(); Long count = (Long) entry.getValue(); System.out.println(word + " : " + count); } super.cleanup(); } }
public class WordCountTopology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt"; private static final String TOPOLOGY_NAME = "word-count-topology"; public static void main(String[] args) { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt splitBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout); builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping( SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping( COUNT_BOLT_ID); Config conf = new Config(); if (args.length == 0) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); try { Thread.sleep(10000); } catch (InterruptedException e) { } cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } else { try { StormSubmitter.submitTopology(args[0], conf,builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } } }
原文地址:http://blog.csdn.net/jinhong_lu/article/details/46531787