标签:master
mkdir -p /home/hadoop/zookeeper/da ta
cd /home/hadoop/zookeeper/data
dataDir=/home/hadoop/zookeeper/data server.1=10.10.113.41:2888:3888 server.2=10.10.113.42:2888:3888 server.3=10.10.113.43:2888:3888
#zookeeper export ZOOKEEPER==/home/hadoop/zookeeper PATH=$PATH:$ZOOKEEPER/bin
zkServer.sh start
PATH=$PATH:/home/hadoop/storm
storm.zookeeper.servers: #zk地址 - "10.10.113.41" - "10.10.113.42" - "10.10.113.43" nimbus.host: "10.10.113.41" #master 节点地址 supervisor.slots.ports: #worker端口 我这里 所有的节点配置文件都是一样的,因为master 节点不参与计算所以这块配置上也可以。 - 6700 - 6701 - 6702 - 6703 storm.local.dir: "/home/hadoop/storm/data" #数据存放地址
nohup bin/storm nimbus >/dev/null 2>&1 & #启动主节点 nohup bin/storm ui >/dev/null 2>&1 & #启动stormUI nohup bin/storm logviewer >/dev/null 2>&1 & #启动logviewer 功能
nohup bin/storm supervisor >/dev/null 2>&1 & nohup bin/storm logviewer >/dev/null 2>&1 &
ui.port=8089
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3</version> </dependency>
packagecn.oraclers.storm; importbacktype.storm.Config; importbacktype.storm.StormSubmitter; importbacktype.storm.generated.AlreadyAliveException; importbacktype.storm.generated.InvalidTopologyException; importbacktype.storm.spout.SpoutOutputCollector; importbacktype.storm.task.OutputCollector; importbacktype.storm.task.TopologyContext; importbacktype.storm.topology.OutputFieldsDeclarer; importbacktype.storm.topology.TopologyBuilder; importbacktype.storm.topology.base.BaseRichBolt; importbacktype.storm.topology.base.BaseRichSpout; importbacktype.storm.tuple.Fields; importbacktype.storm.tuple.Tuple; importbacktype.storm.tuple.Values; importbacktype.storm.utils.Utils; importjava.util.HashMap; importjava.util.Map; importjava.util.Random; publicclassWordCount{ publicstaticclassSpoutSourceextendsBaseRichSpout{ Mapmap; TopologyContexttopologyContext; SpoutOutputCollectorspoutOutputCollector; Randomrandom; @Override publicvoidopen(Mapmap,TopologyContexttopologyContext,SpoutOutputCollectorspoutOutputCollector){ map=map; topologyContext=topologyContext; spoutOutputCollector=spoutOutputCollector; random=random; } String[]sentences=newString[]{"thecowjumpedoverthemoon","anappleadaykeepsthedoctoraway", "fourscoreandsevenyearsago","snowwhiteandthesevendwarfs","iamattwowithnature"}; @Override publicvoidnextTuple(){ Utils.sleep(1000); for(Stringsentence:sentences){ spoutOutputCollector.emit(newValues(sentence)); } } @Override publicvoiddeclareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer){ outputFieldsDeclarer.declare(newFields("sentence")); } } publicstaticclassSplitBoltSourceextendsBaseRichBolt{ Mapmap; TopologyContexttopologyContext; OutputCollectoroutputCollector; @Override publicvoidprepare(Mapmap,TopologyContexttopologyContext,OutputCollectoroutputCollector){ map=map; topologyContext=topologyContext; outputCollector=outputCollector; } @Override publicvoidexecute(Tupletuple){ Stringsentence=tuple.getStringByField("sentence"); String[]words=sentence.split(""); for(Stringword:words){ this.outputCollector.emit(newValues(word)); } } @Override publicvoiddeclareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer){ outputFieldsDeclarer.declare(newFields("word")); } } publicstaticclassSumBoltSourceextendsBaseRichBolt{ Mapmap; TopologyContexttopologyContext; OutputCollectoroutputCollector; @Override publicvoidprepare(Mapmap,TopologyContexttopologyContext,OutputCollectoroutputCollector){ this.map=map; this.topologyContext=topologyContext; this.outputCollector=outputCollector; } Map<String,Integer>mapCount=newHashMap<String,Integer>(); @Override publicvoidexecute(Tupletuple){ Stringword=tuple.getStringByField("word"); Integercount=mapCount.get(word); if(count==null){ count=0; } count++; mapCount.put(word,count); outputCollector.emit(newValues(word,count)); } @Override publicvoiddeclareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer){ outputFieldsDeclarer.declare(newFields("word","count")); } } publicstaticvoidmain(String[]args)throwsAlreadyAliveException,InvalidTopologyException{ TopologyBuilderbuilder=newTopologyBuilder(); builder.setSpout("data_source",newSpoutSource()); builder.setBolt("bolt_split",newSplitBoltSource()).shuffleGrouping("data_source"); builder.setBolt("bolt_sum",newSplitBoltSource()).fieldsGrouping("bolt_split",newFields("word")); try{ ConfigstormConf=newConfig(); stormConf.setDebug(true); StormSubmitter.submitTopology("Clustertopology",stormConf,builder.createTopology()); }catch(AlreadyAliveExceptione){ e.printStackTrace(); }catch(InvalidTopologyExceptione){ e.printStackTrace(); } } }
./storm jar storm jar sd-1.0-SNAPSHOT.jar cn.oraclers.storm.WordCount
本文出自 “8155900” 博客,请务必保留此出处http://8165900.blog.51cto.com/8155900/1721542
标签:master
原文地址:http://8165900.blog.51cto.com/8155900/1721542