码迷,mamicode.com
首页 > 其他好文 > 详细

Storm WordCount Topology详解

时间:2015-06-19 14:57:14      阅读:186      评论:0      收藏:0      [点我收藏+]

标签:


 1 package org.apache.storm.storm_core;
 2 
 3 import java.util.Map;
 4 
 5 import backtype.storm.task.OutputCollector;
 6 import backtype.storm.task.TopologyContext;
 7 import backtype.storm.topology.OutputFieldsDeclarer;
 8 import backtype.storm.topology.base.BaseRichBolt;
 9 import backtype.storm.tuple.Fields;
10 import backtype.storm.tuple.Tuple;
11 import backtype.storm.tuple.Values;
12 
13 public class SplitSentenceBolt extends BaseRichBolt {
14     /**
15      * 
16      */
17     private static final long serialVersionUID = -2107029392155190729L;
18     private OutputCollector collector;// 用来向其他Spout发射tuple的发射器
19 
20     /*
21      * (non-Javadoc) prepare方法类似于open方法,prepare在bolt初始化时被调用
22      */
23     public void prepare(Map stormConf, TopologyContext context,
24             OutputCollector collector) {
25         // TODO Auto-generated method stub
26         this.collector = collector;// 发射器初始化
27 
28     }
29 
30     public void execute(Tuple input) {
31         // TODO Auto-generated method stub
32         // 接收从SentenceSpout的发射器发射过来的tuple,因为SentenceSpout中声明的tuple字段为sentence,故getStringByField方法的参数为sentence
33         String sentence = input.getStringByField("sentence");// 该tuple是一个包含
34                                                                 // 键为sentence
35                                                                 // 值为字符串
36                                                                 // 的列表List<Map<sentence,String>>
37         String[] words = sentence.split(" ");// 将字符串分解成一个个的单词
38         for (String word : words)
39             this.collector.emit(new Values(word));// 将每个单词构造成tuple并发送给下一个Spout
40     }
41 
42     public void declareOutputFields(OutputFieldsDeclarer declarer) {
43         // TODO Auto-generated method stub
44         declarer.declare(new Fields("word"));// 定义SplitSentenceBolt发送的tuple的字段("键值")为 word
45     }
46 }

 

 1 package org.apache.storm.storm_core;
 2 
 3 import java.util.Map;
 4 
 5 import backtype.storm.spout.SpoutOutputCollector;
 6 import backtype.storm.task.TopologyContext;
 7 import backtype.storm.topology.OutputFieldsDeclarer;
 8 import backtype.storm.topology.base.BaseRichSpout;
 9 import backtype.storm.tuple.Fields;
10 import backtype.storm.tuple.Values;
11 import backtype.storm.utils.Utils;
12 
13 public class SentenceSpout extends BaseRichSpout {
14     /**
15      * 
16      */
17     private static final long serialVersionUID = 3444934973982660864L;
18     private SpoutOutputCollector collector;// 用来向其他Spout发射tuple
19     private String[] sentences = { "my dog has fleas", "i like cold beverages",
20             "the dog ate my homework", "don‘t have a cow man",
21             "i don‘t think i like fleas" };
22 
23     private int index = 0;
24 
25     /*
26      * open() 方法在所有的Spout组件初始化时被调用
27      * 
28      * @param Map conf storm 配置信息
29      * 
30      * @context TopologyContext topology 组件信息
31      */
32     public void open(@SuppressWarnings("rawtypes") Map conf,
33             TopologyContext context, SpoutOutputCollector collector) {
34         // TODO Auto-generated method stub
35         this.collector = collector;
36     }
37 
38     /*
39      * Values.java extends ArrayList Storm 调用该方法向输出的collector发射tuple
40      */
41     public void nextTuple() {
42         // TODO Auto-generated method stub
43         // 以字符串数组sentences 中的每个字符串 作为参数 构造tuple
44         this.collector.emit(new Values(sentences[index]));// 通过emit方法将构造好的tuple发送出去
45         index++;
46         if (index >= sentences.length) {
47             index = 0;
48         }
49         Utils.sleep(100);
50     }
51 
52     /*
53      * SentenceSpout 发送的tuple它是一个包含键值对的List,该方法声明了List中包含的键值对的键为 sentence
54      */
55     public void declareOutputFields(OutputFieldsDeclarer declarer) {
56         // TODO Auto-generated method stub
57         declarer.declare(new Fields("sentence"));// 标记SentenceSpout发送的tuple的键为
58                                                     // sentence
59     }
60 }

 

 

 1 package org.apache.storm.storm_core;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.topology.TopologyBuilder;
 6 import backtype.storm.tuple.Fields;
 7 import backtype.storm.utils.Utils;
 8 
 9 public class WordCountTopology {
10     private static final String SENTENCE_SPOUT_ID = "sentence-spout";
11     private static final String SPLIT_BOLT_ID = "split-bolt";
12     private static final String COUNT_BOLT_ID = "count-bolt";
13     private static final String REPORT_BOLT_ID = "report-bolt";
14     private static final String TOPOLOGY_NAME = "word-count-topology";
15     
16     public static void main(String[] args) throws Exception{
17         SentenceSpout spout = new SentenceSpout();
18         SplitSentenceBolt splitBolt = new SplitSentenceBolt();
19         WordCountBolt countBolt = new WordCountBolt();
20         ReportBolt reportBolt = new ReportBolt();
21         
22         TopologyBuilder builder = new TopologyBuilder();
23         builder.setSpout(SENTENCE_SPOUT_ID, spout);
24         builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
25         builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
26         builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
27         
28         Config config = new Config();
29         LocalCluster cluster = new LocalCluster();
30         
31         cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
32         Utils.sleep(1000);
33         cluster.killTopology(TOPOLOGY_NAME);
34         cluster.shutdown();
35         
36     }
37 }

 

 

package org.apache.storm.storm_core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class ReportBolt extends BaseRichBolt{
/**
     * 
     */
    private static final long serialVersionUID = 4921144902730095910L;
    //    private OutputCollector collector; ReportBolt不需要发射tuple了
    private HashMap<String, Long> counts = null;
    
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        // TODO Auto-generated method stub
        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        this.counts.put(word, count);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        //不需要发出任何数据流
    }
    
    //Topology在storm集群中运行时,cleanup方法是不可靠的,并不能保证它一定会执行
    public void cleanup(){
        System.out.println("------ print counts ------");
        List<String> keys = new ArrayList<String>();
        keys.addAll(counts.keySet());//将HashMap中所有的键都添加到一个集合中
        Collections.sort(keys);//对键(单词)进行排序
        for(String key : keys)//输出排好序的每个单词的出现次数
            System.out.println(key + " : " + this.counts.get(key));
        System.out.println("--------bye----------");
    }
}
package storm.starter;

import java.util.HashMap;
import java.util.Map;
 
import storm.starter.RandomSentenceSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * This topology demonstrates Storm‘s stream groupings and multilang
 * capabilities.
 */
public class WordCountTopology {
    public static class SplitSentence extends BaseBasicBolt {
        
        public void execute(Tuple input, BasicOutputCollector collector) {
            try {
                String msg = input.getString(0);
                System.out.println(msg + "-------------------");
                if (msg != null) {
                    String[] s = msg.split(" ");
                    for (String string : s) {
                        collector.emit(new Values(string));
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
 
       
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }
 
    public static class WordCount extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
 
       
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null)
                count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }
 
      
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }
 
    public static void main(String[] args) throws Exception {
 
        TopologyBuilder builder = new TopologyBuilder();
 
        builder.setSpout("spout", new RandomSentenceSpout(), 5);
 
        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",new Fields("word"));
 
        Config conf = new Config();
        conf.setDebug(true);
 
        if (args != null && args.length > 0) {
            /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程
            如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了
            一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交
            但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
           */
            conf.setNumWorkers(3);
 
            StormSubmitter.submitTopology(args[0], conf,
                    builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            //指定为本地模式运行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
 
            Thread.sleep(10000);
 
            cluster.shutdown();
        }
    }
}

 

Storm WordCount Topology详解

标签:

原文地址:http://www.cnblogs.com/hapjin/p/4588413.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!