public class MyFunction extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { for ( int i = 0; i < tuple.getInteger(0); i++) { collector.emit( new Values(i)); } } }
java mystream.each(new Fields("b"), new MyFunction(), new Fields("d")));
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2; } }
java mystream.each(new Fields("b", "a"), new MyFilter());
java mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"));
Partition 1: [“a”, 3] [“c”, 8]
Partition 1: [11]
public interface CombinerAggregator <T> extends Serializable { T init(TridentTuple tuple); T combine(T val1, T val2); T zero(); }
public class CombinerCount implements CombinerAggregator<Integer>{ @Override public Integer init(TridentTuple tuple) { return 1; } @Override public Integer combine(Integer val1, Integer val2) { return val1 + val2; } @Override public Integer zero() { return 0; } }
public interface ReducerAggregator <T> extends Serializable { T init(); T reduce(T curr, TridentTuple tuple); }
public class ReducerCount implements ReducerAggregator<Long>{ @Override public Long init() { return 0L; } @Override public Long reduce(Long curr, TridentTuple tuple) { return curr + 1; } }
最后一个是Aggregator接口,它是最通用的聚合器,它的形式如下:
public interface Aggregator<T> extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T val, TridentTuple tuple, TridentCollector collector); void complete(T val, TridentCollector collector); }
下面举例说明:
public class CountAgg extends BaseAggregator<CountState>{ static class CountState { long count = 0; } @Override public CountState init(Object batchId, TridentCollector collector) { return new CountState(); } @Override public void aggregate(CountState val, TridentTuple tuple, TridentCollector collector) { val. count+=1; } @Override public void complete(CountState val, TridentCollector collector) { collector.emit( new Values(val. count)); } }
有时候我们需要同时执行多个聚合器,这在Trident中被称作chaining,使用方法如下:
java mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd();
这点代码会在每个Partition上运行count和sum函数,最终输出一个tuple:[“count”, “sum”]
projection:投影操作
投影操作作用是仅保留Stream指定字段的数据,比如有一个Stream包含如下字段: [“a”, “b”, “c”, “d”]
运行如下代码:
java mystream.project(new Fields("b", "d"))
则输出的流仅包含 [“b”, “d”]字段。
2.2 Repartitioning重定向操作
重定向操作是如何在各个任务间对tuples进行分区。分区的数量也有可能改变重定向的结果。重定向需要网络传输,下面介绍下重定向函数:
Trident有aggregate和 persistentAggregate方法来做聚合操作。aggregate是独立的运行在Stream的每个Batch上的,而persistentAggregate则是运行在Stream的所有Batch上并把运算结果存储在state source中。 运行aggregate方法做全局聚合。当你用到 ReducerAggregator或Aggregator时,Stream首先被重定向到一个分区中,然后其中的聚合函数便在这个分区上运行。当你用到CombinerAggregator时,Trident会首先在每个分区上做局部聚合,然后把局部聚合后的结果重定向到一个分区,因此使用CombinerAggregator会更高效,可能的话我们需要优先考虑使用它。 下面举个例子来说明如何用aggregate进行全局计数:java mystream.aggregate(new Count(), new Fields("count"));和paritionAggregate一样,aggregators的聚合也可以串联起来,但是如果你把一个 CombinerAggregator和一个非CombinerAggregator串联在一起,Trident是无法完成局部聚合优化的。
2.4 grouped streams
GroupBy操作是根据特定的字段对流进行重定向的,还有,在一个分区内部,每个相同字段的tuple也会被Group到一起,下面这幅图描述了这个场景:
如果你在grouped Stream上面运行aggregators,聚合操作会运行在每个Group中而不是整个Batch。persistentAggregate也能运行在GroupedSteam上,不过结果会被保存在MapState中,其中的key便是分组的字段。
当然,aggregators在GroupedStreams上也可以串联。
2.5 Merge和Joins:
api的最后一部分便是如何把各种流汇聚到一起。最简单的方式就是把这些流汇聚成一个流。我们可以这么做:java topology.merge(stream1, stream2, stream3);另一种合并流的方式就是join。一个标准的join就像是一个sql,必须有标准的输入,因此,join只针对符合条件的Stream。join应用在来自Spout的每一个小Batch中。join时候的tuple会包含:
1. join的字段,如Stream1中的key和Stream2中的x
2. 所有非join的字段,根据传入join方法的顺序,a和b分别代表steam1的val1和val2,c代表Stream2的val1
当join的是来源于不同Spout的stream时,这些Spout在发射数据时需要同步,一个Batch所包含的tuple会来自各个Spout。
Storm专题二:Storm Trident API 使用详解
原文地址:http://blog.csdn.net/suifeng3051/article/details/38752631