码迷,mamicode.com
首页 > Web开发 > 详细

Strom之网站UV,PV统计

时间:2015-12-26 01:16:15      阅读:245      评论:0      收藏:0      [点我收藏+]

标签:

SPOUT:

 

package base;

import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class SourceSpout implements IRichSpout{

    /**
     * 数据源Spout
     */
    private static final long serialVersionUID = 1L;

    Queue<String> queue = new ConcurrentLinkedQueue<String>();
    
    SpoutOutputCollector collector = null;
    
    
    String str = null;

    @Override
    public void nextTuple() {
        if (queue.size() >= 0) {
            collector.emit(new Values(queue.poll()));
        }
        try {
            Thread.sleep(500) ;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
    }
    
    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.collector = collector;
            
            Random random = new Random();
            String[] hosts = { "www.taobao.com" };
            String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                    "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
            String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 
                    "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
            
            for (int i = 0; i < 100; i++) {
                queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void close() {
        // TODO Auto-generated method stub
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("log"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("spout ack:"+msgId.toString());
    }

    @Override
    public void activate() {
        // TODO Auto-generated method stub
        
    }



    @Override
    public void deactivate() {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("spout fail:"+msgId.toString());
    }

}

 

BOLT:

格式化数据源

package com.storm.user_visit;

import java.util.Map;

import tools.DateFmt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class FmtLogBolt implements IBasicBolt{

    private static final long serialVersionUID = 1L;

    String logString = null;
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        
        logString = input.getString(0);
        if (logString !=null && logString.length() > 0) {
            
            String[] split = logString.split("\t");
            collector.emit(new Values(DateFmt.getCountDate(split[2], DateFmt.date_short),split[1]));//日期 sesion_id
        }
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date","session_id"));
    }
    
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
    }

    @Override
    public void cleanup() {
    }

}

 

 

统计每个用户的深度

package com.storm.user_visit;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class DeepVisitBolt implements IBasicBolt {

    private static final long serialVersionUID = 1L;
    
    Map<String,Long> counts = new HashMap<String, Long>();

    String session_id = null;
    String datestr = null;
    long pv =0;
    
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        
        String datestr = input.getString(0);
        String session_id = input.getString(1);
        
        Long count =  counts.get(datestr+"_"+session_id);

        if(count == null){
            count = 0L;
        }
        
        count++;
        counts.put(datestr+"_"+session_id, count);
        collector.emit(new Values(datestr+"_"+session_id, count));
    }
    
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date_session","count"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        
    }

    @Override
    public void cleanup() {
    }
}

 

 

使用Zookeeper线程锁,每5秒输出一次,同时统计PV和UV

package com.storm.user_visit;

import java.net.InetAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;

import tools.DateFmt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class UVSumBolt implements IBasicBolt {

    private static final long serialVersionUID = 1L;
    
    Map<String, Integer> counts = new HashMap<String, Integer>();

    
    String cur_date = null;
    long beginTime = System.currentTimeMillis();
    
    long endTime  = 0;
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        
        try {
            
            endTime = System.currentTimeMillis();
            
            long PV=0L;//总数个数
            long UV =0L;//去重总数个数
            
            String date_sessionid = input.getString(0);
            Integer count = input.getInteger(1);
            
            //判断数据是否是当天的数据,如果时间是今天的数据或者是明天的
            if (! date_sessionid.startsWith(cur_date) && DateFmt.parseDate(date_sessionid.split("_")[0]).after(DateFmt.parseDate(cur_date))) {
                //第二天的数据,跨天,有新的数据了
                cur_date = date_sessionid.split("_")[0];
                counts.clear();
            }
            
            //可能会出现旧的数据
            counts.put(date_sessionid, count);
            
             if (endTime - beginTime >= 5*1000 ) {
                 
                //获取word去重的个数,遍历counts的keySet,取count    
                Iterator<String> i2 = counts.keySet().iterator();
                while (i2.hasNext()) {
                    
                    String key =i2.next();
                    
                    if(key != null){
                        
                        //只计算今天的数据
                        if (key.startsWith(cur_date)) {
                            UV ++;
                            PV += counts.get(key);
                        }
                    }
                }
            
                //判断是否相等,保证只有一个task能匹配
                 if (lockData.equals(new String(keeper.getData(zk_path, false, null)))) {
                     
                     System.err.println("  PV  =  "+PV +" UV   = "+UV);
                 }
             }
             beginTime = System.currentTimeMillis();
             
             
            
        } catch (Exception e) {
            throw new FailedException("split fail!");
        }
    }
    
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    
    public static final String zk_path = "/lock/storm/pv";
    ZooKeeper keeper  = null;
    String lockData = null;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        cur_date = DateFmt.getCountDate("2014-01-07", DateFmt.date_short);
        
        try {
                keeper =new ZooKeeper("hadoop:2181", 3000, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        System.err.println("even = "+event.getType());
                    }
                });
            
             //判断zookeeper是否连接上,如果没有连接成功一直等待,保证Zookeeper能连接上
            while (keeper.getState() != ZooKeeper.States.CONNECTED) {
                System.out.println("connect failed");
                Thread.sleep(1000);
            }
            
            InetAddress address = InetAddress.getLocalHost();
            lockData = address.getHostAddress()+":"+context.getThisTaskId();
            
            //其他线程发现该目录已经存在,就保证了唯一
            if (keeper.exists(zk_path, false) == null) {
                //创建临时目录
                keeper.create(zk_path, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            }
            
        } catch (Exception e) {
            
            try {
                keeper.close();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
        
        
    }
    @Override
    public void cleanup() {
        
    }

}

 

 

 

ToPo:

package com.storm.user_visit;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import base.SourceSpout;

public class UVToPo {
        
    public static void main(String [] args) throws Exception{
        
        TopologyBuilder builder =new TopologyBuilder();
        
        builder.setSpout("spout", new SourceSpout(),1);
        builder.setBolt("Fmtbolt", new FmtLogBolt(),4).shuffleGrouping("spout");
        builder.setBolt("deepbolt", new DeepVisitBolt(),4).fieldsGrouping("Fmtbolt", new Fields("date","session_id"));
        builder.setBolt("UvSum", new UVSumBolt(),1).shuffleGrouping("deepbolt");
        
        //设置参数
        Config conf = new Config();
        
        if (args.length > 0) {
            //分布式提交
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }else{
            //本地模式提交
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());
        }
        
        
    }
}

 

Strom之网站UV,PV统计

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5077344.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!