标签:ace auto one object lin uil for 虚拟 create
1.产生虚拟日志
package les7.readFileTopo; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.Random; public class GetData { /** * @param args */ public static void main(String[] args) { File logFile = new File("track.log"); Random random = new Random(); String[] hosts = { "movie information" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U12", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2019-03-07 08:40:50", "2019-03-07 08:40:51", "2019-03-07 08:40:52", "2019-03-07 08:40:53", "2019-03-07 09:40:49", "2019-03-07 10:40:49", "2019-03-07 11:40:49", "2019-03-07 12:40:49" }; StringBuffer sbBuffer = new StringBuffer() ; for (int i = 0; i < 5000; i++) { sbBuffer.append(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]+"\n"); } if(! logFile.exists()) { try { logFile.createNewFile(); } catch (IOException e) { System.out.println("Create logFile fail !"); } } byte[] b = (sbBuffer.toString()).getBytes(); FileOutputStream fs; try { fs = new FileOutputStream(logFile); fs.write(b); fs.close(); } catch (Exception e) { e.printStackTrace(); } } }
2.spout自定义数据流入拓扑逻辑
package les7.readFileTopo; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class ReadFileSpout implements IRichSpout{ /** * */ private static final long serialVersionUID = 1L; FileInputStream fis; InputStreamReader isr; BufferedReader br; SpoutOutputCollector collector = null; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // 初始化方法 try { this.collector = collector; this.fis = new FileInputStream("track.log"); this.isr = new InputStreamReader(fis, "UTF-8"); this.br = new BufferedReader(isr); } catch (Exception e) { e.printStackTrace(); } } @Override public void close() { // TODO 关闭Topo } @Override public void activate() { // TODO 激活Topo } @Override public void deactivate() { // TODO 停用Topo } String str = null; String[] str01=null; @Override public void nextTuple() { // TODO 核心方法,死循环,获取外部Touple,emit到下一级组件 try { while ((str = this.br.readLine()) != null) { // // 过滤动作 // str01=str.split("\t"); collector.emit(new Values(str)); // Thread.sleep(3); // //to do } } catch (Exception e) { // TODO: handle exception } } @Override public void ack(Object msgId) { // TODO 如果开启Acker,成功执行Tuple后会回调该4方法,告知Storm框架该Tuple已经被成功执行。 } @Override public void fail(Object msgId) { // TODO 如果开启Acker,当失败执行Tuple后会回调该方法,告知Storm框架该Tuple已经被执行失败。 // 以便我们手工编码实现失败重发,并控制重发次数。 } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO 定义输出的列名 declarer.declare(new Fields("log")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO 可以在代码里设置一下属性。该方法基本是废弃不用的。 return null; } }
3.bolt处理逻辑
package les7.readFileTopo; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class FileBolt implements IRichBolt { /** * */ private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO 初始化函数 this.collector = collector; } Integer num = 0; String[] words; @Override public void execute(Tuple tuple) { // TODO 死循环,核心方法,处理业务逻辑 String value =tuple.getString(0); //分词 String[] words = value.split("\t"); //输出 for(String w:words){ collector.emit(new Values(w,1)); } } @Override public void cleanup() { // TODO 销毁方法,基本不用4 } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO 定义输出列名 declarer.declare(new Fields("word","count")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
4.bolt输出逻辑
package les7.readFileTopo; import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import static org.apache.htrace.Tracer.LOG; public class PrintBolt implements IRichBolt { /** * */ private static final long serialVersionUID = 1L; private Map<String, Integer> result = new HashMap<>(); private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub this.collector = collector; } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); int count = tuple.getIntegerByField("count"); //求和 if(result.containsKey(word)){ //如果已经存在,累加 int total = result.get(word); result.put(word, total+count); }else{ //这是一个新单词 result.put(word, count); } //输出到屏幕 System.out.println("统计的结果是:" + result); //输出给下一个组件 单词 总频率 this.collector.emit(new Values(word,result.get(word))); } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
5.书写拓扑逻辑代码
package les7.readFileTopo; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; //展示信息数据 public class FileCountTopo { public static void main(String[] args) { // TODO Auto-generated method stub TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new ReadFileSpout(),1) ; builder.setBolt("b1", new FileBolt(),2).shuffleGrouping("spout"); builder.setBolt("PrintBolt", new PrintBolt(),1).shuffleGrouping("b1"); Config conf = new Config(); conf.setDebug(true); if (args.length > 0) { try { //提交到集群 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } }else { //本地模式提交 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
标签:ace auto one object lin uil for 虚拟 create
原文地址:https://www.cnblogs.com/tangsonghuai/p/11169032.html