// 3、创建topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);//设置executor数量为5 builder.setBolt("filter-bolt", new FilterBolt(), 3).shuffleGrouping( "kafka-reader");//设置executor数量为3 builder.setBolt("log-splitter", new LogSplitterBolt(), 3) .shuffleGrouping("filter-bolt");//设置executor数量为5 builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping( "log-splitter");//设置executor数量为2 // 4、启动topology Config conf = new Config(); conf.put(Config.NIMBUS_HOST, nimbusHost); conf.setNumWorkers(3); //设置worker数量 StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
原文地址:http://blog.csdn.net/jinhong_lu/article/details/46531867