标签:storm-kafka
使用storm-kafka模块读取kafka中的数据,按照以下两步进行构建(我使用的版本是0.9.3)ZkHosts支持两种创建方式, public ZkHosts(String brokerZkStr, String brokerZkPath) //使用默认brokerZkPath:"/brokers" public ZkHosts(String brokerZkStr)
public KafkaConfig(BrokerHosts hosts, String topic) //clientId如果不想每次随机生成的话,就自己设置一个 public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
//这个地方其实就是kafka配置文件里边的zookeeper.connect这个参数,可以去那里拿过来。
String brokerZkStr = "10.100.90.201:2181/kafka_online_sample";
String brokerZkPath = "/brokers";
ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath);
String topic = "mars-wap";
//以下:将offset汇报到哪个zk集群,相应配置
// String offsetZkServers = "10.199.203.169";
String offsetZkServers = "10.100.90.201";
String offsetZkPort = "2181";
List<String> zkServersList = new ArrayList<String>();
zkServersList.add(offsetZkServers);
//汇报offset信息的root路径
String offsetZkRoot = "/stormExample";
//存储该spout id的消费offset信息,譬如以topoName来命名
String offsetZkId = "storm-example";
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);
kafkaConfig.zkRoot = offsetZkRoot;
kafkaConfig.zkPort = Integer.parseInt(offsetZkPort);
kafkaConfig.zkServers = zkServersList;
kafkaConfig.id = offsetZkId;
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout spout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
// cluster submit.
// try {
// StormSubmitter.submitTopology("storm-kafka-example",config,builder.createTopology());
// } catch (AlreadyAliveException e) {
// e.printStackTrace();
// } catch (InvalidTopologyException e) {
// e.printStackTrace();
// }
标签:storm-kafka
原文地址:http://blog.csdn.net/tonylee0329/article/details/43016385