码迷,mamicode.com
首页 > 其他好文 > 详细

Storm 基础 -- spout与bolt设置多重grouping

时间:2016-05-12 21:33:54      阅读:2430      评论:0      收藏:0      [点我收藏+]

标签:

Topology的代码如下:

TopologyBuilder builder = new TopologyBuilder();
//WordReaderSpout会从文件中读取数据,数据用shuffle的方式发送给bolt进行处理
//当文件读取完成后,会发送一个global消息
builder.setSpout("word-reader",new WordReaderSpout());
builder.setBolt("word-normalizer", new WordNormalizerBolt())
        .shuffleGrouping("word-reader")
        .globalGrouping("word-reader", "FINISH");

builder.setBolt("word-counter", new WordCounterBolt(),1)
     .fieldsGrouping("word-normalizer", new Fields("word"))
     .globalGrouping("word-normalizer", "CLOSE");

以globalGrouping为例:
globalGrouping(“word-reader”, “FINISH”); 两个参数的含义
第一个参数: “word-reader” 为componet id, 这个值 与我们代码的 中的word-reader一致。

builder.setSpout("word-reader",new WordReaderSpout());

第二个参数: “FINISH”为stream id
这个值是在WordReaderSpout中定义的

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("line"));
    declarer.declareStream("FINISH", new Fields("FINISH Reade file"));
}

下面以我们熟知的HashMap为例,解释这两个参数
1) 我们需要定义一个word-reader 的HashMap对象

Map<String, Object> word-reader = new HashMap<String, Object>();

2) 我们调用put两个对象进去,这两个对象对应的Key值分别为default与FINISH

word-reader.put("default", new Object());
word-reader.put("FINISH", new Object());

3) 有其它的对象需要获取我们put进去的两个对象,首先就需要先获取word-reader的Handler。

WordNormalizerBolt  word-normalizer = new WordNormalizerBolt();
word-normalizer.setMap(word-reader);

对应到Storm的代码中,就如下:
1) 首先我们需要创建一个word-reader的对象, —这是bolt代码

builder.setSpout("word-reader",new WordReaderSpout());

2) 我们需要put两个对象,这里与Map可能有点儿差别,因为需要先声明。 — 这是spout的代码

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("line")); //declare()相当于declarer.declareStream(“defalut”, new Fields("line"));
        declarer.declareStream("FINISH", new Fields("FINISH Reade file"));
    }

而put对象则是在nextTuple()中进行的

    public void nextTuple() {
                 ... ... 
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            // Read all lines
            while ((str = reader.readLine()) != null) {
                               //put一个key为"default"的对象
                this.collector.emit(new Values(str), str);
            }
        } catch (Exception e) {
                      ... ...
        }
         //put一个key为"FINISH"的对象
        this.collector.emit("FINISH", new Values("Finish"));
    }

3) bolt获取Map对象 —- 在TopologyBuilder代码中实现

builder.setBolt("word-normalizer", new WordNormalizerBolt())
        .shuffleGrouping("word-reader")
        .globalGrouping("word-reader", "FINISH");

最后附上源码:http://download.csdn.net/detail/eyoulc123/9514466

Storm 基础 -- spout与bolt设置多重grouping

标签:

原文地址:http://blog.csdn.net/eyoulc123/article/details/51353200

(1)
(1)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!