标签:
(一)理论基础
更多理论以后再补充,或者参考书籍
1、trident是什么?
Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you‘re familiar with
high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing
on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.
简单的说,trident是storm的更高层次抽象,相对storm,它主要提供了2个方面的好处:
(1)提供了更高层次的抽象,将常用的count,sum等封装成了方法,可以直接调用,不需要自己实现。
(2)提供了一次原语,如groupby等。
(3)提供了事务支持,可以保证数据均处理且只处理了一次。
2、trident每次处理消息均为batch为单位,即一次处理多个元组。
(二)看官方提供的示例
package org.ljh.tridentdemo; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple; public class TridentWordCount { public static class Split extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(" ")) { collector.emit(new Values(word)); } } } public static StormTopology buildTopology(LocalDRPC drpc) { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values( "the cow jumped over the moon"), new Values( "the man went to the store and bought some candy"), new Values( "four score and seven years ago"), new Values("how many apples can you eat"), new Values( "to be or not to be the person")); spout.setCycle(true); //创建拓扑对象 TridentTopology topology = new TridentTopology(); //这个流程用于统计单词数据,结果将被保存在wordCounts中 TridentState wordCounts = topology.newStream("spout1", spout) .parallelismHint(16) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(16); //这个流程用于查询上面的统计结果 topology.newDRPCStream("words", drpc) .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); return topology.build(); } public static void main(String[] args) throws Exception { Config conf = new Config(); conf.setMaxSpoutPending(20); if (args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounter", conf, buildTopology(drpc)); for (int i = 0; i < 100; i++) { System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped")); Thread.sleep(1000); } } else { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null)); } } }
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values( "the cow jumped over the moon"), new Values( "the man went to the store and bought some candy"), new Values( "four score and seven years ago"), new Values("how many apples can you eat"), new Values( "to be or not to be the person")); spout.setCycle(true);
TridentState wordCounts = topology.newStream("spout1", spout) .parallelismHint(16) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(16);
topology.newDRPCStream("words", drpc) .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
public static class Split extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(" ")) { collector.emit(new Values(word)); } } }
public static void main(String[] args) throws Exception { Config conf = new Config(); conf.setMaxSpoutPending(20); if (args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounter", conf, buildTopology(drpc)); for (int i = 0; i < 100; i++) { System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped")); Thread.sleep(1000); } } else { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null)); } }
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:
原文地址:http://blog.csdn.net/jinhong_lu/article/details/46696549