Storm中Topology的概念类似于Hadoop中的MapReduce Job,是一个用来编排、容纳一组计算逻辑组件(Spout、Bolt)的对象(Hadoop MapReduce中一个Job包含一组Map Task、Reduce Task),这一组计算组件可以按照DAG图的方式编排起来(通过选择Stream Groupings来控制数据流分发流向),从而组合成一个计算逻辑更加负责的对象,那就是Topology。一个Topology运行以后就不能停止,它会无限地运行下去,除非手动干预(显式执行bin/storm kill )或意外故障(如停机、整个Storm集群挂掉)让它终止。
Storm中Spout是一个Topology的消息生产的源头,Spout应该是一个持续不断生产消息的组件,例如,它可以是一个Socket Server在监听外部Client连接并发送消息,可以是一个消息队列(MQ)的消费者、可以是用来接收Flume Agent的Sink所发送消息的服务,等等。Spout生产的消息在Storm中被抽象为Tuple,在整个Topology的多个计算组件之间都是根据需要抽象构建的Tuple消息来进行连接,从而形成流。
Storm中用来定义各个计算组件(Spout、Bolt)之间流的连接、分组、分发关系。Storm定义了如下7种分发策略:Shuffle Grouping(随机分组)、Fields Grouping(按字段分组)、All Grouping(广播分组)、Global Grouping(全局分组)、Non Grouping(不分组)、Direct Grouping(直接分组)、Local or Shuffle Grouping(本地/随机分组),各种策略的具体含义可以参考Storm官方文档、比较容易理解。

01 |
public static class ProduceRecordSpout extends BaseRichSpout { |
03 |
private static final long serialVersionUID = 1L; |
04 |
private static final Log LOG = LogFactory.getLog(ProduceRecordSpout. class ); |
05 |
private SpoutOutputCollector collector; |
06 |
private Random random; |
07 |
private String[] records; |
09 |
public ProduceRecordSpout(String[] records) { |
10 |
this .records = records; |
14 |
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { |
15 |
this .collector = collector; |
16 |
random = new Random(); |
20 |
public void nextTuple() { |
22 |
String record = records[random.nextInt(records.length)]; |
23 |
List<Object> values = new Values(record); |
24 |
collector.emit(values, values); |
25 | "Record emitted: record=" + record); |
29 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
30 |
declarer.declare( new Fields( "record" )); |
01 |
public static class WordSplitterBolt extends BaseRichBolt { |
03 |
private static final long serialVersionUID = 1L; |
04 |
private static final Log LOG = LogFactory.getLog(WordSplitterBolt. class ); |
05 |
private OutputCollector collector; |
08 |
public void prepare(Map stormConf, TopologyContext context, |
09 |
OutputCollector collector) { |
10 |
this .collector = collector; |
14 |
public void execute(Tuple input) { |
15 |
String record = input.getString( 0 ); |
16 |
if (record != null && !record.trim().isEmpty()) { |
17 |
for (String word : record.split( "\\s+" )) { |
18 |
collector.emit(input, new Values(word, 1 )); |
19 | "Emitted: word=" + word); |
26 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
27 |
declarer.declare( new Fields( "word" , "count" )); |
01 |
public static class WordCounterBolt extends BaseRichBolt { |
03 |
private static final long serialVersionUID = 1L; |
04 |
private static final Log LOG = LogFactory.getLog(WordCounterBolt. class ); |
05 |
private OutputCollector collector; |
06 |
private final Map<String, AtomicInteger> counterMap = Maps.newHashMap(); |
09 |
public void prepare(Map stormConf, TopologyContext context, |
10 |
OutputCollector collector) { |
11 |
this .collector = collector; |
15 |
public void execute(Tuple input) { |
16 |
String word = input.getString( 0 ); |
17 |
int count = input.getIntegerByField( "count" ); |
18 |
AtomicInteger ai = counterMap.get(word); |
20 |
ai = new AtomicInteger( 0 ); |
21 |
counterMap.put(word, ai); |
24 | "DEBUG: word=" + word + ", count=" + ai.get()); |
29 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
33 |
public void cleanup() { |
35 | "Word count results:" ); |
36 |
for (Entry<String, AtomicInteger> entry : counterMap.entrySet()) { |
37 | "\tword=" + entry.getKey() + ", count=" + entry.getValue().get()); |
01 |
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { |
03 |
TopologyBuilder builder = new TopologyBuilder(); |
04 |
String[] records = new String[] { |
05 |
"A Storm cluster is superficially similar to a Hadoop cluster" , |
06 |
"All coordination between Nimbus and the Supervisors is done through a Zookeeper cluster" , |
07 |
"The core abstraction in Storm is the stream" |
10 |
.setSpout( "spout-producer" , new ProduceRecordSpout(records), 1 ) |
13 |
.setBolt( "bolt-splitter" , new WordSplitterBolt(), 2 ) |
14 |
.shuffleGrouping( "spout-producer" ) |
16 |
builder.setBolt( "bolt-counter" , new WordCounterBolt(), 1 ) |
17 |
.fieldsGrouping( "bolt-splitter" , new Fields( "word" )) |
21 |
Config conf = new Config(); |
22 |
String name = WordCountTopology. class .getSimpleName(); |
23 |
if (args != null && args.length > 0 ) { |
24 |
String nimbus = args[ 0 ]; |
25 |
conf.put(Config.NIMBUS_HOST, nimbus); |
26 |
conf.setNumWorkers( 2 ); |
27 |
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); |
29 |
LocalCluster cluster = new LocalCluster(); |
30 |
cluster.submitTopology(name, conf, builder.createTopology()); |

- 存在3类数据:数字字符串(NUM)、字母字符串(STR)、特殊符号字符串(SIG)
- 每个ProduceRecordSpout负责处理上面提到的3类数据
- 所有数据都是字符串,字符串中含有空格,3种类型的ProduceRecordSpout所emit的数据都需要被相同的逻辑处理:根据空格来拆分字符串
- 一个用来分发单词的组件DistributeWordByTypeBolt能够接收到所有的单词(包含类型信息),统一将每类单词分别分发到指定的一个用来存储数据的组件
- SaveDataBolt用来存储处理过的单词,对于不同类型单词具有不同的存储逻辑,需要设置3类SaveDataBolt
2 |
String NUMBER = "NUMBER" ; |
3 |
String STRING = "STRING" ; |
01 |
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { |
04 |
TopologyBuilder builder = new TopologyBuilder(); |
07 |
builder.setSpout( "spout-number" , new ProduceRecordSpout(Type.NUMBER, new String[] { "111 222 333" , "80966 31" }), 1 ); |
08 |
builder.setSpout( "spout-string" , new ProduceRecordSpout(Type.STRING, new String[] { "abc ddd fasko" , "hello the word" }), 1 ); |
09 |
builder.setSpout( "spout-sign" , new ProduceRecordSpout(Type.SIGN, new String[] { "++ -*% *** @@" , "{+-} ^#######" }), 1 ); |
12 |
builder.setBolt( "bolt-splitter" , new SplitRecordBolt(), 2 ) |
13 |
.shuffleGrouping( "spout-number" ) |
14 |
.shuffleGrouping( "spout-string" ) |
15 |
.shuffleGrouping( "spout-sign" ); |
18 |
builder.setBolt( "bolt-distributor" , new DistributeWordByTypeBolt(), 6 ) |
19 |
.fieldsGrouping( "bolt-splitter" , new Fields( "type" )); |
22 |
builder.setBolt( "bolt-number-saver" , new SaveDataBolt(Type.NUMBER), 3 ) |
23 |
.shuffleGrouping( "bolt-distributor" , "stream-number-saver" ); |
24 |
builder.setBolt( "bolt-string-saver" , new SaveDataBolt(Type.STRING), 3 ) |
25 |
.shuffleGrouping( "bolt-distributor" , "stream-string-saver" ); |
26 |
builder.setBolt( "bolt-sign-saver" , new SaveDataBolt(Type.SIGN), 3 ) |
27 |
.shuffleGrouping( "bolt-distributor" , "stream-sign-saver" ); |
30 |
Config conf = new Config(); |
31 |
String name = MultiStreamsWordDistributionTopology. class .getSimpleName(); |
32 |
if (args != null && args.length > 0 ) { |
33 |
String nimbus = args[ 0 ]; |
34 |
conf.put(Config.NIMBUS_HOST, nimbus); |
35 |
conf.setNumWorkers( 3 ); |
36 |
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); |
38 |
LocalCluster cluster = new LocalCluster(); |
39 |
cluster.submitTopology(name, conf, builder.createTopology()); |
40 |
Thread.sleep( 60 * 60 * 1000 ); |
01 |
public static class ProduceRecordSpout extends BaseRichSpout { |
03 |
private static final long serialVersionUID = 1L; |
04 |
private static final Log LOG = LogFactory.getLog(ProduceRecordSpout. class ); |
05 |
private SpoutOutputCollector collector; |
07 |
private String[] recordLines; |
10 |
public ProduceRecordSpout(String type, String[] lines) { |
16 |
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { |
17 |
this .collector = collector; |
23 |
public void nextTuple() { |
25 |
String record = recordLines[rand.nextInt(recordLines.length)]; |
26 |
List<Object> values = new Values(type, record); |
27 |
collector.emit(values, values); |
28 | "Record emitted: type=" + type + ", record=" + record); |
32 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
33 |
declarer.declare( new Fields( "type" , "record" )); |
01 |
public static class SplitRecordBolt extends BaseRichBolt { |
03 |
private static final long serialVersionUID = 1L; |
04 |
private static final Log LOG = LogFactory.getLog(SplitRecordBolt. class ); |
05 |
private OutputCollector collector; |
08 |
public void prepare(Map stormConf, TopologyContext context, |
09 |
OutputCollector collector) { |
10 |
this .collector = collector; |
14 |
public void execute(Tuple input) { |
15 |
String type = input.getString( 0 ); |
16 |
String line = input.getString( 1 ); |
17 |
if (line != null && !line.trim().isEmpty()) { |
18 |
for (String word : line.split( "\\s+" )) { |
19 |
collector.emit(input, new Values(type, word)); |
20 | "Word emitted: type=" + type + ", word=" + word); |
28 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
29 |
declarer.declare( new Fields( "type" , "word" )); |
- DistributeWordByTypeBolt组件
01 |
public static class DistributeWordByTypeBolt extends BaseRichBolt { |
03 |
private static final long serialVersionUID = 1L; |
04 |
private static final Log LOG = LogFactory.getLog(DistributeWordByTypeBolt. class ); |
05 |
private OutputCollector collector; |
08 |
public void prepare(Map stormConf, TopologyContext context, |
09 |
OutputCollector collector) { |
10 |
this .collector = collector; |
11 |
Map<GlobalStreamId, Grouping> sources = context.getThisSources(); |
12 | "sources==> " + sources); |
16 |
public void execute(Tuple input) { |
17 |
String type = input.getString( 0 ); |
18 |
String word = input.getString( 1 ); |
21 |
emit( "stream-number-saver" , type, input, word); |
24 |
emit( "stream-string-saver" , type, input, word); |
27 |
emit( "stream-sign-saver" , type, input, word); |
32 |
emit( "stream-discarder" , type, input, word); |
38 |
private void emit(String streamId, String type, Tuple input, String word) { |
39 |
collector.emit(streamId, input, new Values(type, word)); |
40 | "Distribution, typed word emitted: type=" + type + ", word=" + word); |
44 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
45 |
declarer.declareStream( "stream-number-saver" , new Fields( "type" , "word" )); |
46 |
declarer.declareStream( "stream-string-saver" , new Fields( "type" , "word" )); |
47 |
declarer.declareStream( "stream-sign-saver" , new Fields( "type" , "word" )); |
48 |
declarer.declareStream( "stream-discarder" , new Fields( "type" , "word" )); |
01 |
public static class SaveDataBolt extends BaseRichBolt { |
03 |
private static final long serialVersionUID = 1L; |
04 |
private static final Log LOG = LogFactory.getLog(SaveDataBolt. class ); |
05 |
private OutputCollector collector; |
09 |
public SaveDataBolt(String type) { |
14 |
public void prepare(Map stormConf, TopologyContext context, |
15 |
OutputCollector collector) { |
16 |
this .collector = collector; |
20 |
public void execute(Tuple input) { |
22 | "[" + type + "] " + |
23 |
"SourceComponent=" + input.getSourceComponent() + |
24 |
", SourceStreamId=" + input.getSourceStreamId() + |
25 |
", type=" + input.getString( 0 ) + |
26 |
", value=" + input.getString( 1 )); |
30 |
public void declareOutputFields(OutputFieldsDeclarer declarer) { |
Storm中最核心的计算组件的抽象就是Spout、Bolt,以及Stream Grouping,其它高级的功能,像Trident、DRPC,他们或者基于这些基础组件以及Streaming Grouping分发策略来实现的,屏蔽了底层的分发计算处理逻辑以更高层的编程抽象面向开发者,减轻了开发人员对底层复杂机制的处理;或者是为了方便使用Storm计算服务而增加的计算机制衍生物,如批量事务处理、RPC等。
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:,不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。