标签:
SPOUT:
package base; import java.util.Map; import java.util.Queue; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class SourceSpout implements IRichSpout{ /** * 数据源Spout */ private static final long serialVersionUID = 1L; Queue<String> queue = new ConcurrentLinkedQueue<String>(); SpoutOutputCollector collector = null; String str = null; @Override public void nextTuple() { if (queue.size() >= 0) { collector.emit(new Values(queue.poll())); } try { Thread.sleep(500) ; } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; Random random = new Random(); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (int i = 0; i < 100; i++) { queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } catch (Exception e) { e.printStackTrace(); } } @Override public void close() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("log")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void ack(Object msgId) { // TODO Auto-generated method stub System.out.println("spout ack:"+msgId.toString()); } @Override public void activate() { // TODO Auto-generated method stub } @Override public void deactivate() { // TODO Auto-generated method stub } @Override public void fail(Object msgId) { // TODO Auto-generated method stub System.out.println("spout fail:"+msgId.toString()); } }
BOLT:
格式化数据源
package com.storm.user_visit; import java.util.Map; import tools.DateFmt; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class FmtLogBolt implements IBasicBolt{ private static final long serialVersionUID = 1L; String logString = null; @Override public void execute(Tuple input, BasicOutputCollector collector) { logString = input.getString(0); if (logString !=null && logString.length() > 0) { String[] split = logString.split("\t"); collector.emit(new Values(DateFmt.getCountDate(split[2], DateFmt.date_short),split[1]));//日期 sesion_id } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date","session_id")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void cleanup() { } }
统计每个用户的深度
package com.storm.user_visit; import java.util.HashMap; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class DeepVisitBolt implements IBasicBolt { private static final long serialVersionUID = 1L; Map<String,Long> counts = new HashMap<String, Long>(); String session_id = null; String datestr = null; long pv =0; @Override public void execute(Tuple input, BasicOutputCollector collector) { String datestr = input.getString(0); String session_id = input.getString(1); Long count = counts.get(datestr+"_"+session_id); if(count == null){ count = 0L; } count++; counts.put(datestr+"_"+session_id, count); collector.emit(new Values(datestr+"_"+session_id, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_session","count")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void cleanup() { } }
使用Zookeeper线程锁,每5秒输出一次,同时统计PV和UV
package com.storm.user_visit; import java.net.InetAddress; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import tools.DateFmt; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.FailedException; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class UVSumBolt implements IBasicBolt { private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); String cur_date = null; long beginTime = System.currentTimeMillis(); long endTime = 0; @Override public void execute(Tuple input, BasicOutputCollector collector) { try { endTime = System.currentTimeMillis(); long PV=0L;//总数个数 long UV =0L;//去重总数个数 String date_sessionid = input.getString(0); Integer count = input.getInteger(1); //判断数据是否是当天的数据,如果时间是今天的数据或者是明天的 if (! date_sessionid.startsWith(cur_date) && DateFmt.parseDate(date_sessionid.split("_")[0]).after(DateFmt.parseDate(cur_date))) { //第二天的数据,跨天,有新的数据了 cur_date = date_sessionid.split("_")[0]; counts.clear(); } //可能会出现旧的数据 counts.put(date_sessionid, count); if (endTime - beginTime >= 5*1000 ) { //获取word去重的个数,遍历counts的keySet,取count Iterator<String> i2 = counts.keySet().iterator(); while (i2.hasNext()) { String key =i2.next(); if(key != null){ //只计算今天的数据 if (key.startsWith(cur_date)) { UV ++; PV += counts.get(key); } } } //判断是否相等,保证只有一个task能匹配 if (lockData.equals(new String(keeper.getData(zk_path, false, null)))) { System.err.println(" PV = "+PV +" UV = "+UV); } } beginTime = System.currentTimeMillis(); } catch (Exception e) { throw new FailedException("split fail!"); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } public static final String zk_path = "/lock/storm/pv"; ZooKeeper keeper = null; String lockData = null; @Override public void prepare(Map stormConf, TopologyContext context) { cur_date = DateFmt.getCountDate("2014-01-07", DateFmt.date_short); try { keeper =new ZooKeeper("hadoop:2181", 3000, new Watcher() { @Override public void process(WatchedEvent event) { System.err.println("even = "+event.getType()); } }); //判断zookeeper是否连接上,如果没有连接成功一直等待,保证Zookeeper能连接上 while (keeper.getState() != ZooKeeper.States.CONNECTED) { System.out.println("connect failed"); Thread.sleep(1000); } InetAddress address = InetAddress.getLocalHost(); lockData = address.getHostAddress()+":"+context.getThisTaskId(); //其他线程发现该目录已经存在,就保证了唯一 if (keeper.exists(zk_path, false) == null) { //创建临时目录 keeper.create(zk_path, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } } catch (Exception e) { try { keeper.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } @Override public void cleanup() { } }
ToPo:
package com.storm.user_visit; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import base.SourceSpout; public class UVToPo { public static void main(String [] args) throws Exception{ TopologyBuilder builder =new TopologyBuilder(); builder.setSpout("spout", new SourceSpout(),1); builder.setBolt("Fmtbolt", new FmtLogBolt(),4).shuffleGrouping("spout"); builder.setBolt("deepbolt", new DeepVisitBolt(),4).fieldsGrouping("Fmtbolt", new Fields("date","session_id")); builder.setBolt("UvSum", new UVSumBolt(),1).shuffleGrouping("deepbolt"); //设置参数 Config conf = new Config(); if (args.length > 0) { //分布式提交 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); }else{ //本地模式提交 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
标签:
原文地址:http://www.cnblogs.com/thinkpad/p/5077344.html