码迷,mamicode.com
首页 > 其他好文 > 详细

storm-kafka

时间:2015-12-30 22:06:58      阅读:396      评论:0      收藏:0      [点我收藏+]

标签:

 1 package kafka.demo;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 import storm.kafka.KafkaSpout;
 7 import storm.kafka.SpoutConfig;
 8 import storm.kafka.StringScheme;
 9 import storm.kafka.ZkHosts;
10 import backtype.storm.Config;
11 import backtype.storm.LocalCluster;
12 import backtype.storm.StormSubmitter;
13 import backtype.storm.generated.AlreadyAliveException;
14 import backtype.storm.generated.InvalidTopologyException;
15 import backtype.storm.spout.SchemeAsMultiScheme;
16 import backtype.storm.topology.TopologyBuilder;
17 
18 public class MyKafkaSpout {
19 
20     public static void main(String[] args) {
21 
22         String topic = "track";
23         ZkHosts zkHosts = new ZkHosts("192.168.183.135:2181");
24 
25         SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, 
26                 "/MyTrack",//偏移量offset的根目录
27                 "track");//对应一个应用
28 
29         List<String> zkServer = new ArrayList<String>();
30 
31         for (String host : zkHosts.brokerZkStr.split(",")) {
32             zkServer.add(host.split(":")[0]);
33         }
34         spoutConfig.zkServers = zkServer;
35         spoutConfig.zkPort = 2181;
36         spoutConfig.forceFromStart = true; // 从头开始消费
37         spoutConfig.socketTimeoutMs = 60 * 1000;
38         spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());// 定义输出为String
39 
40         TopologyBuilder builder = new TopologyBuilder();
41         builder.setSpout("spout", new KafkaSpout(spoutConfig),1);
42         builder.setBolt("bolt", new MyKafkaBolt()).shuffleGrouping("spout");
43         
44         
45         Config conf = new Config ();
46         conf.setDebug(false) ;
47         
48         if (args.length > 0) {
49             try {
50                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
51             } catch (AlreadyAliveException e) {
52                 e.printStackTrace();
53             } catch (InvalidTopologyException e) {
54                 e.printStackTrace();
55             }
56         }else {
57             LocalCluster localCluster = new LocalCluster();
58             localCluster.submitTopology("mytopology", conf, builder.createTopology());
59         }
60 
61     }
62 }

 

 1 package kafka.demo;
 2 
 3 import java.util.Map;
 4 
 5 import backtype.storm.task.TopologyContext;
 6 import backtype.storm.topology.BasicOutputCollector;
 7 import backtype.storm.topology.IBasicBolt;
 8 import backtype.storm.topology.OutputFieldsDeclarer;
 9 import backtype.storm.tuple.Tuple;
10 
11 public class MyKafkaBolt implements IBasicBolt{
12 
13     private static final long serialVersionUID = 1L;
14 
15     @Override
16     public void declareOutputFields(OutputFieldsDeclarer declarer) {
17         
18     }
19 
20     @Override
21     public Map<String, Object> getComponentConfiguration() {
22         return null;
23     }
24 
25     @SuppressWarnings("rawtypes")
26     @Override
27     public void prepare(Map stormConf, TopologyContext context) {
28         
29     }
30 
31     @Override
32     public void execute(Tuple input, BasicOutputCollector collector) {
33         
34         String kafkaMag = input.getString(0);
35         System.out.println("kafkaMag :"+kafkaMag);
36     }
37 
38     @Override
39     public void cleanup() {
40         
41     }
42 
43 }

 

storm-kafka

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5090070.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!