标签:
1 package kafka.demo; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 import storm.kafka.KafkaSpout; 7 import storm.kafka.SpoutConfig; 8 import storm.kafka.StringScheme; 9 import storm.kafka.ZkHosts; 10 import backtype.storm.Config; 11 import backtype.storm.LocalCluster; 12 import backtype.storm.StormSubmitter; 13 import backtype.storm.generated.AlreadyAliveException; 14 import backtype.storm.generated.InvalidTopologyException; 15 import backtype.storm.spout.SchemeAsMultiScheme; 16 import backtype.storm.topology.TopologyBuilder; 17 18 public class MyKafkaSpout { 19 20 public static void main(String[] args) { 21 22 String topic = "track"; 23 ZkHosts zkHosts = new ZkHosts("192.168.183.135:2181"); 24 25 SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, 26 "/MyTrack",//偏移量offset的根目录 27 "track");//对应一个应用 28 29 List<String> zkServer = new ArrayList<String>(); 30 31 for (String host : zkHosts.brokerZkStr.split(",")) { 32 zkServer.add(host.split(":")[0]); 33 } 34 spoutConfig.zkServers = zkServer; 35 spoutConfig.zkPort = 2181; 36 spoutConfig.forceFromStart = true; // 从头开始消费 37 spoutConfig.socketTimeoutMs = 60 * 1000; 38 spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());// 定义输出为String 39 40 TopologyBuilder builder = new TopologyBuilder(); 41 builder.setSpout("spout", new KafkaSpout(spoutConfig),1); 42 builder.setBolt("bolt", new MyKafkaBolt()).shuffleGrouping("spout"); 43 44 45 Config conf = new Config (); 46 conf.setDebug(false) ; 47 48 if (args.length > 0) { 49 try { 50 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 51 } catch (AlreadyAliveException e) { 52 e.printStackTrace(); 53 } catch (InvalidTopologyException e) { 54 e.printStackTrace(); 55 } 56 }else { 57 LocalCluster localCluster = new LocalCluster(); 58 localCluster.submitTopology("mytopology", conf, builder.createTopology()); 59 } 60 61 } 62 }
1 package kafka.demo; 2 3 import java.util.Map; 4 5 import backtype.storm.task.TopologyContext; 6 import backtype.storm.topology.BasicOutputCollector; 7 import backtype.storm.topology.IBasicBolt; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.tuple.Tuple; 10 11 public class MyKafkaBolt implements IBasicBolt{ 12 13 private static final long serialVersionUID = 1L; 14 15 @Override 16 public void declareOutputFields(OutputFieldsDeclarer declarer) { 17 18 } 19 20 @Override 21 public Map<String, Object> getComponentConfiguration() { 22 return null; 23 } 24 25 @SuppressWarnings("rawtypes") 26 @Override 27 public void prepare(Map stormConf, TopologyContext context) { 28 29 } 30 31 @Override 32 public void execute(Tuple input, BasicOutputCollector collector) { 33 34 String kafkaMag = input.getString(0); 35 System.out.println("kafkaMag :"+kafkaMag); 36 } 37 38 @Override 39 public void cleanup() { 40 41 } 42 43 }
标签:
原文地址:http://www.cnblogs.com/thinkpad/p/5090070.html