码迷,mamicode.com
首页 > 其他好文 > 详细

storm集群搭建

时间:2015-12-10 17:15:06      阅读:225      评论:0      收藏:0      [点我收藏+]

标签:master


mkdir -p /home/hadoop/zookeeper/da
ta


cd  /home/hadoop/zookeeper/data



dataDir=/home/hadoop/zookeeper/data
server.1=10.10.113.41:2888:3888
server.2=10.10.113.42:2888:3888
server.3=10.10.113.43:2888:3888
#zookeeper
export ZOOKEEPER==/home/hadoop/zookeeper
PATH=$PATH:$ZOOKEEPER/bin


zkServer.sh start
PATH=$PATH:/home/hadoop/storm


 

storm.zookeeper.servers:  
#zk地址
   
 - "10.10.113.41"
   
 - "10.10.113.42"
   
 - "10.10.113.43"
 
nimbus.host: "10.10.113.41"  
#master 节点地址
supervisor.slots.ports:   
#worker端口 我这里 所有的节点配置文件都是一样的,因为master 节点不参与计算所以这块配置上也可以。
    - 6700
    - 6701
    - 6702
    - 6703
 storm.local.dir: "/home/hadoop/storm/data" #数据存放地址


nohup bin/storm nimbus >/dev/null
2>&1 &     #启动主节点
nohup bin/storm ui >/dev/null
2>&1 &               #启动stormUI
nohup bin/storm logviewer >/dev/null
2>&1 & #启动logviewer 功能
nohup bin/storm supervisor >/dev/null 2>&1 &
nohup bin/storm logviewer >/dev/null 2>&1 &
ui.port=8089


<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
</dependency>



packagecn.oraclers.storm;
importbacktype.storm.Config;
importbacktype.storm.StormSubmitter;
importbacktype.storm.generated.AlreadyAliveException;
importbacktype.storm.generated.InvalidTopologyException;
importbacktype.storm.spout.SpoutOutputCollector;
importbacktype.storm.task.OutputCollector;
importbacktype.storm.task.TopologyContext;
importbacktype.storm.topology.OutputFieldsDeclarer;
importbacktype.storm.topology.TopologyBuilder;
importbacktype.storm.topology.base.BaseRichBolt;
importbacktype.storm.topology.base.BaseRichSpout;
importbacktype.storm.tuple.Fields;
importbacktype.storm.tuple.Tuple;
importbacktype.storm.tuple.Values;
importbacktype.storm.utils.Utils;
 
importjava.util.HashMap;
importjava.util.Map;
importjava.util.Random;
 
publicclassWordCount{
publicstaticclassSpoutSourceextendsBaseRichSpout{
 
Mapmap;
TopologyContexttopologyContext;
SpoutOutputCollectorspoutOutputCollector;
Randomrandom;
@Override
publicvoidopen(Mapmap,TopologyContexttopologyContext,SpoutOutputCollectorspoutOutputCollector){
map=map;
topologyContext=topologyContext;
spoutOutputCollector=spoutOutputCollector;
random=random;
}
 
String[]sentences=newString[]{"thecowjumpedoverthemoon","anappleadaykeepsthedoctoraway",
"fourscoreandsevenyearsago","snowwhiteandthesevendwarfs","iamattwowithnature"};
@Override
publicvoidnextTuple(){
Utils.sleep(1000);
for(Stringsentence:sentences){
spoutOutputCollector.emit(newValues(sentence));
}
}
 
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer){
outputFieldsDeclarer.declare(newFields("sentence"));
}
 
}
 
publicstaticclassSplitBoltSourceextendsBaseRichBolt{
 
Mapmap;
TopologyContexttopologyContext;
OutputCollectoroutputCollector;
@Override
publicvoidprepare(Mapmap,TopologyContexttopologyContext,OutputCollectoroutputCollector){
map=map;
topologyContext=topologyContext;
outputCollector=outputCollector;
}
 
@Override
publicvoidexecute(Tupletuple){
Stringsentence=tuple.getStringByField("sentence");
String[]words=sentence.split("");
for(Stringword:words){
this.outputCollector.emit(newValues(word));
}
}
 
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer){
outputFieldsDeclarer.declare(newFields("word"));
}
}
 
 
publicstaticclassSumBoltSourceextendsBaseRichBolt{
Mapmap;
TopologyContexttopologyContext;
OutputCollectoroutputCollector;
 
@Override
publicvoidprepare(Mapmap,TopologyContexttopologyContext,OutputCollectoroutputCollector){
this.map=map;
this.topologyContext=topologyContext;
this.outputCollector=outputCollector;
}
 
Map<String,Integer>mapCount=newHashMap<String,Integer>();
 
@Override
publicvoidexecute(Tupletuple){
Stringword=tuple.getStringByField("word");
Integercount=mapCount.get(word);
if(count==null){
count=0;
}
count++;
mapCount.put(word,count);
 
outputCollector.emit(newValues(word,count));
 
 
 
}
 
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer){
outputFieldsDeclarer.declare(newFields("word","count"));
}
}
 
publicstaticvoidmain(String[]args)throwsAlreadyAliveException,InvalidTopologyException{
TopologyBuilderbuilder=newTopologyBuilder();
builder.setSpout("data_source",newSpoutSource());
builder.setBolt("bolt_split",newSplitBoltSource()).shuffleGrouping("data_source");
builder.setBolt("bolt_sum",newSplitBoltSource()).fieldsGrouping("bolt_split",newFields("word"));
 
 
try{
ConfigstormConf=newConfig();
stormConf.setDebug(true);
StormSubmitter.submitTopology("Clustertopology",stormConf,builder.createTopology());
}catch(AlreadyAliveExceptione){
e.printStackTrace();
}catch(InvalidTopologyExceptione){
e.printStackTrace();
}
}
 
}



./storm jar storm jar sd-1.0-SNAPSHOT.jar cn.oraclers.storm.WordCount

本文出自 “8155900” 博客,请务必保留此出处http://8165900.blog.51cto.com/8155900/1721542

storm集群搭建

标签:master

原文地址:http://8165900.blog.51cto.com/8155900/1721542

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!