标签:
-----------------------------WordSpout
package com.hzw.storm.Spout;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private static final String[] msgs = new String[] { "I have a dream", "my dream is to be a data analyst",
"you can do what you are dreaming", "don‘t give up your dreams" };
private static final Random random = new Random();
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector c) {
// 初始化,只执行1次
this.collector = c;
}
@Override
public void nextTuple() {
//不停的被调用
String sentence = msgs[random.nextInt(4)];
this.collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//给数据值加一个标签
declarer.declare(new Fields("sentence"));
}
}
-----------------------------SplitSentenceBolt
package com.hzw.storm.bolt;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SplitSentenceBolt implements IBasicBolt {
@Override
public void prepare(Map conf, TopologyContext context) {
// 一次性的初始化工作
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
-----------------------------WordCountBolt
package com.hzw.storm.bolt;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordCountBolt implements IBasicBolt {
private Map<String, Integer> _counts = new HashMap<String, Integer>();
@Override
public void prepare(Map arg0, TopologyContext arg1) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
int count;
if (_counts.containsKey(word)) {
count = _counts.get(word);
} else {
count = 0;
}
count++;
_counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
-----------------------------MyTopology
package com.hzw.storm.topology;
import com.hzw.storm.Spout.WordSpout;
import com.hzw.storm.bolt.SplitSentenceBolt;
import com.hzw.storm.bolt.WordCountBolt;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class MyTopology {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("WordSpout", new WordSpout(), 2);
builder.setBolt("SplitSentenceBolt", new SplitSentenceBolt(), 10).shuffleGrouping("WordSpout");
builder.setBolt("WordCountBolt", new WordCountBolt(), 20).fieldsGrouping("SplitSentenceBolt", new Fields("word"));
}
}
这样,整个拓扑图就写好了。
关于拓扑图的具体的意思,请参考文章:http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis
可以先小摘片段放在这里备忘:
Stream Groupings:
Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型:
1. 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。
2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。
3. 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。
4. 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。
5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。
标签:
原文地址:http://my.oschina.net/qiangzigege/blog/516035