流分组
在设计一个topology的时候,你需要做的最重要的事情是定义数据在组件之间怎样交换(流怎样被bolts消费)。流分组指定了每个bolt消费哪些流和这些流被怎样消费。
一个结点可以发射不止一条数据流。流分组允许我们选择接收哪些流。
正如我们在第二章看到的,当topology被定义的时候流分组就被设置好了:
...
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader"); ...
在上述代码块中,在topology builder上设置了一个bolt,然后源被设置为shuffle分组。流分组通常使用源组件的ID作为参数,并且也会选择性的使用其他参数,取决于流分组的种类。
每个InputDeclare可以有不止一个源,同时每个源可以用不同的流分组来分组。
Shuffle分组
Shuffle分组是最常用的分组方式。它使用一个参数(源组件),源组件会发射元组到一个随机
选择的bolt并确保每个消费者会收到等数量的元组。
Shuffle分组对于做原子操作例如数学操作是很有用的。然而,如果操作不能被随机分布,就像第二章中的你需要计数单词的示例,你应用考虑使用其他的分组。 Fields分组
Fields分组允许你基于元组的一个或多个域来控制元组怎样被发送到bolts。它确保一个联合域中给定的值集合总是会被送到相同的bolt。回到单词计数的示例,如果你通过单词域分组流,word-normalizer bolt总是会将元组和给定的单词一起发送到相同的word-counter bolt实例中。
...
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word")); ...
fields分组中的所有的域必须在源组件中被声明。
All分组
All分组发送每个元组的一份单独拷贝到接收bolt的所有实例上。这种分组被用来向bolts发送信号。例如,如果你需要刷新缓存,你可以发送一个刷新缓存信号到所有的bolts。在单词计数的示例中,你可以通过使用all分组来增加清空计数器缓存的功能(见Topologies示例)。
public void execute(Tuple input) {
String str = null;
try{
if(input.getSourceStreamId().equals("signals")){
str = input.getStringByField("action"); if("refreshCache".equals(str))
counters.clear();
}
}catch (IllegalArgumentException e) {
//Do nothing } ...
}
我们增加了一个条件判断来检查流的源。Storm给了我们声明命名的流的可能性(如果你不发送元组到一个命名的流,则流的名字是”default”);它是一个非常好的方式来确定元组的源,正如这个例子中我们需要确定信号一样。
在topology定义中,你会增加另一个流到单词计数bolt来将元组从signals-spout 流发送到这个bolt的所有实例。
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word")) .allGrouping("signals-spout","signals")
Signals-spout的实现可以在git库中找到。 自定义分组
你可以通过实现backtype.storm.grouping.CustomStreamGrouping接口来实现你的自定义流分组。这给了你决定每个元组将被哪个(些)bolt收到的权力。
我们修改单词计数示例来对元组进行分组,这样的话相同字母开头的单词将被相同的bolt接收。
public class ModuleGrouping implements CustomStreamGrouping, Serializable{
int numTasks = 0; @Override
public List<Integer> chooseTasks(List<Object> values) {
List<Integer> boltIds = new ArrayList(); if(values.size()>0){
String str = values.get(0).toString(); if(str.isEmpty())
boltIds.add(0); else
boltIds.add(str.charAt(0) % numTasks);
}
return boltIds; }
@Override
public void prepare(TopologyContext context, Fields outFields,
List<Integer> targetTasks) {
numTasks = targetTasks.size(); }
}
你可以看到一个CustomStreamGrouping的简单实现,在这里我们使用任务的数量来对单词的第一个字符的整型值取模,由此来决定哪个bolt将接收这个元组。 要使用示例中分分组,按照下列方式修改word-normalizer分组。
builder.setBolt("word-normalizer", new WordNormalizer())
.customGrouping("word-reader", new ModuleGrouping()); Direct分组
这是一个由源决定哪个组件将接收元组的分组。与前一个示例类似,源将基于单词的第一个字母决定哪个bolt接收元组。为使用direct分组,在WordNormalizer中,使用emitDirect方法代替emit方法。
public void execute(Tuple input) {
...
for(String word : words){
if(!word.isEmpty()){
...
collector.emitDirect(getWordCountIndex(word),new Values(word)); } }
// Acknowledge the tuple collector.ack(input); }
public Integer getWordCountIndex(String word) {
word = word.trim().toUpperCase(); if(word.isEmpty())
return 0; else
return word.charAt(0) % numCounterTasks;
}
在prepare方法中算出目标任务的数量:
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) { this.collector = collector;
this.numCounterTasks = context.getComponentTasks("word-counter"); }
在topology的定义中,指定流将被直接分组:
builder.setBolt("word-counter", new WordCounter(),2)
.directGrouping("word-normalizer");
Global分组
Global分组将所有源实例产生的元组发送到一个单独的目标实例(特别地,ID最低的任务)中。 None分组
在写这本著作的时候(storm版本0.7.1),使用这种分组与使用22页的”Shuffle分组”是一样的。换言之,当用这个分组时,流怎样分组是无所谓的。
更多精彩内容请关注:http://bbs.superwu.cn
原文地址:http://crxy2013.blog.51cto.com/9922445/1655023