标签:style blog color io ar java sp 数据 div
创建maven项目,在pom.xml中加入以下配置:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <type>jar</type> <version>0.9.3-rc1</version> </dependency>
创建SimpleSpout类用于获取数据流:
1 package com.hirain.storm.helloworld; 2 3 import java.util.Map; 4 import java.util.Random; 5 6 import backtype.storm.spout.SpoutOutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.base.BaseRichSpout; 10 import backtype.storm.tuple.Fields; 11 import backtype.storm.tuple.Values; 12 13 public class SimpleSpout extends BaseRichSpout{ 14 15 /** 16 * 17 */ 18 private static final long serialVersionUID = 1L; 19 20 //用来发射数据的工具类 21 private SpoutOutputCollector collector; 22 23 private static String[] info = new String[]{ 24 "comaple\t,12424,44w46,654,12424,44w46,654,", 25 "lisi\t,435435,6537,12424,44w46,654,", 26 "lipeng\t,45735,6757,12424,44w46,654,", 27 "hujintao\t,45735,6757,12424,44w46,654,", 28 "jiangmin\t,23545,6457,2455,7576,qr44453", 29 "beijing\t,435435,6537,12424,44w46,654,", 30 "xiaoming\t,46654,8579,w3675,85877,077998,", 31 "xiaozhang\t,9789,788,97978,656,345235,09889,", 32 "ceo\t,46654,8579,w3675,85877,077998,", 33 "cto\t,46654,8579,w3675,85877,077998,", 34 "zhansan\t,46654,8579,w3675,85877,077998,"}; 35 36 Random random=new Random(); 37 38 39 /** 40 * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用 41 */ 42 public void nextTuple() { 43 try { 44 String msg = info[random.nextInt(11)]; 45 // 调用发射方法 46 collector.emit(new Values(msg)); 47 // 模拟等待100ms 48 Thread.sleep(100); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } 52 } 53 /** 54 * 初始化collector 55 */ 56 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 57 this.collector = collector; 58 59 } 60 61 62 /** 63 * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 64 * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构 65 */ 66 public void declareOutputFields(OutputFieldsDeclarer declarer) { 67 declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应 68 } 69 70 }
创建SimpleBolt类,用于处理数据:
1 package com.hirain.storm.helloworld; 2 3 import backtype.storm.topology.BasicOutputCollector; 4 import backtype.storm.topology.OutputFieldsDeclarer; 5 import backtype.storm.topology.base.BaseBasicBolt; 6 import backtype.storm.tuple.Fields; 7 import backtype.storm.tuple.Tuple; 8 import backtype.storm.tuple.Values; 9 10 11 12 public class SimpleBolt extends BaseBasicBolt { 13 14 /** 15 * 16 */ 17 private static final long serialVersionUID = 1L; 18 19 public void execute(Tuple input,BasicOutputCollector collector) { 20 try { 21 String msg = input.getString(0); 22 if (msg != null){ 23 //System.out.println("msg="+msg); 24 collector.emit(new Values(msg + "msg is processed!")); 25 } 26 27 } catch (Exception e) { 28 e.printStackTrace(); 29 } 30 31 } 32 33 public void declareOutputFields( 34 OutputFieldsDeclarer declarer) { 35 declarer.declare(new Fields("info")); 36 37 } 38 39 }
创建main方法配置storm的topology并启动本地模式运行:
1 package com.hirain.storm.helloworld; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.topology.TopologyBuilder; 7 8 public class SimpleTopology { 9 10 11 public static void main(String[] args) { 12 try { 13 // 实例化TopologyBuilder类。 14 TopologyBuilder topologyBuilder = new TopologyBuilder(); 15 // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 16 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1); 17 // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。 18 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout"); 19 Config config = new Config(); 20 config.setDebug(true); 21 if (args != null && args.length > 0) { 22 config.setNumWorkers(1); 23 StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); 24 } else { 25 // 这里是本地模式下运行的启动代码。 26 config.setMaxTaskParallelism(1); 27 LocalCluster cluster = new LocalCluster(); 28 cluster.submitTopology("simple", config, topologyBuilder.createTopology()); 29 } 30 31 } catch (Exception e) { 32 e.printStackTrace(); 33 } 34 } 35 }
以上为storm的简单的helloworld,仅供参考
标签:style blog color io ar java sp 数据 div
原文地址:http://www.cnblogs.com/zhangyukun/p/4031066.html