标签:一起 apach util void length ati extends bsp xtend
一,环境搭建
eclipse的项目的创键和jar包的导入。
二,代码编写
1,组件spout的代码编写,用来发射数据源。
package com;
import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class RandomSentenceSpout extends BaseRichSpout{
//用来收集spout的输出tuple
private SpoutOutputCollector Collector;
//private Random rand;
private static final long SrialversionUID=1l;
@Override
public void nextTuple() {
// String[] data={"hello zhangsan","nice to meet","you zhangsan hello","lisi welcome to bj"};
// Collector.emit(new Values(data[rand.nextInt(data.length-1)]));
String[] datas= {"hello zhangsan nice to meet you zhangsan hello lisi welcome to bj"};
Values values=new Values(datas[0]);
//发射的数据
Collector.emit(values);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//初始化操作,只执行一遍
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector Collector ) {
this.Collector=Collector;
}
//为发射的数据添加唯一标识,
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("spout"));
}
}
2,bolt组件的代码编写,用来切割字段。
package com;
import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class RandomSentenceSpout extends BaseRichSpout{
//用来收集spout的输出tuple
private SpoutOutputCollector Collector;
//private Random rand;
private static final long SrialversionUID=1l;
@Override
public void nextTuple() {
// String[] data={"hello zhangsan","nice to meet","you zhangsan hello","lisi welcome to bj"};
// Collector.emit(new Values(data[rand.nextInt(data.length-1)]));
String[] datas= {"hello zhangsan nice to meet you zhangsan hello lisi welcome to bj"};
Values values=new Values(datas[0]);
Collector.emit(values);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//初始化操作,只执行一遍
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector Collector ) {
this.Collector=Collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("spout"));
}
}
3,bolt组件的代码编写,用来统计字段的数量。
package com;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordCount extends BaseRichBolt{
private static final Long SrialversionUID=1l;
private OutputCollector collector;
Map<String,Integer>map=new HashMap<String,Integer>();
@Override
public void execute(Tuple value) {
String data = value.getStringByField("word");
if(map.containsKey(data)){
map.put(data, map.get(data)+1);
}else{
map.put(data,1);
}
System.out.println(map);
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
this.collector=collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer d) {
//d.declare(new Fields("words","int"));
}
}
4,编写提交类
package com;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class mian {
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new RandomSentenceSpout());
topologyBuilder.setBolt("wordBolt", new WordBolt()).shuffleGrouping("spout");
topologyBuilder.setBolt("wordint", new WordCount()).fieldsGrouping("wordBolt", new Fields("word"));
Config config = new Config();
if(args==null||args.length==0){
//集群模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordCount",config ,topologyBuilder.createTopology());
}else{
//单机模式
config.setNumWorkers(1);
try {
StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (AuthorizationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
5,打成jar包,上传到服务器运行。注意只打主类的class,不要连带项目中的jar一起打入。否则在集群上面会报错。
标签:一起 apach util void length ati extends bsp xtend
原文地址:https://www.cnblogs.com/songweideboke/p/9901083.html