码迷,mamicode.com
首页 > 编程语言 > 详细

Storm之网站PV统计利用Zookeeper锁控制线程操作

时间:2015-12-25 19:21:22      阅读:1082      评论:0      收藏:0      [点我收藏+]

标签:

PV(page views): count (session_id)

 

多线程下,注意线程安全问题

一、PV统计

方案分析

如下是否可行?

1、定义static long pv, Synchronized 控制累计操作

Synchronized 和 Lock在单JVM下有效,但在多JVM下无效

 

 可行的两个方案:

1、shuffleGrouping下,pv * Executer并发数

2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总

 线程安全:多线程处理的结果和单线程一致

 

汇总型方案:

1、shuffleGrouping下,pv(单线程结果) * Executer并发数

一个Executer默认一个task,如果设置Task数大于1,公式应该是:

pv(单线程结果) * Task 数 ,

同一个Executer下task的线程ID相同,taskId不同

 

优点:简单、计算量小

缺点:稍有误差,但绝大多数场景能接受

 

优化:

案例PVBolt中每个Task都会输出一个汇总值,实际只需要一个Task输出汇总值,

利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次

 

 

2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总

优点:1、绝对准确; 2、如果用fieldGrouping可以得到中间值,如单个user的访问PV(访问深度,也是有用指标)

缺点:计算量稍大,且多一个Bolt

Spout:

package base;

import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class SourceSpout implements IRichSpout{

    /**
     * 数据源Spout
     */
    private static final long serialVersionUID = 1L;

    Queue<String> queue = new ConcurrentLinkedQueue<String>();
    
    SpoutOutputCollector collector = null;
    
    
    String str = null;

    @Override
    public void nextTuple() {
        if (queue.size() >= 0) {
            collector.emit(new Values(queue.poll()));
        }
        try {
            Thread.sleep(500) ;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
    }
    
    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.collector = collector;
            
            Random random = new Random();
            String[] hosts = { "www.taobao.com" };
            String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                    "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
            String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 
                    "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
            
            for (int i = 0; i < 100; i++) {
                queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void close() {
        // TODO Auto-generated method stub
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("log"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("spout ack:"+msgId.toString());
    }

    @Override
    public void activate() {
        // TODO Auto-generated method stub
        
    }



    @Override
    public void deactivate() {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("spout fail:"+msgId.toString());
    }

}

 

 

Bolt:

package com.storm.visits;

import java.net.InetAddress;
import java.util.Map;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

/**
 * shuffleGrouping下,pv(单线程结果) * Executer并发数
    一个Executer默认一个task,如果设置Task数大于1,公式应该是:
    pv(单线程结果) * Task 数 ,
        
    一个execute下可以有多个task    
    同一个Executer下task的线程ID相同,taskId不同
    
    
    利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次
 *
 */
public class PVBolt  implements IRichBolt{

    private static final long serialVersionUID = 1L;
    private OutputCollector collector;
    
    public static final String zk_path = "/lock/storm/pv";
    ZooKeeper keeper  = null;
    String lockData = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
        
        try {
             keeper =new ZooKeeper("hadoop:2181",3000,new Watcher(){
                @Override
                public void process(WatchedEvent event) {
                    System.err.println("event : "+event.getType());
                }
            });
            
             //判断zookeeper是否连接上,如果没有连接成功一直等待,保证Zookeeper能连接上
            while (keeper.getState() != ZooKeeper.States.CONNECTED) {
                Thread.sleep(1000);
            }
            
            
            InetAddress address = InetAddress.getLocalHost();
            lockData = address.getHostAddress()+":"+context.getThisTaskId();
            
            //其他线程发现该目录已经存在,就保证了唯一
            if (keeper.exists(zk_path, false) == null) {
                //创建临时目录
                keeper.create(zk_path, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                
            }
            
        } catch (Exception e) {
            try {
                
                keeper.close();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }
    
    
    
    String logString =null;
    String session_id = null;
    long pv =0;
    
    long beginTime = System.currentTimeMillis();
    long endTime  = 0;
    @Override
    public void execute(Tuple input) {
        
        try {
            logString  = input.getString(0);
            endTime = System.currentTimeMillis();

            if(logString != null){
                session_id = logString.split("\t")[1];
                 if(session_id != null){
                     pv++;
                 }
            }
            
             //利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次
             if (endTime - beginTime >= 5*1000 ) {
                 
                 //判断是否相等,保证只有一个task能匹配
                 if (lockData.equals(keeper.getData(zk_path, false, null))) {
                      
                     //shuffleGrouping下,pv * Executer并发数
                     System.err.println(Thread.currentThread().getName()+ " pv = "+pv*4);
                 }
                 beginTime = System.currentTimeMillis();
             }
             
             collector.ack(input);
            
        } catch (Exception e) {
            collector.fail(input);
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(""));
    }
    
    @Override
    public void cleanup() {}

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

 

 

 

package com.storm.visits;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import base.SourceSpout;
/**
 * 
 * TopologyBuilder
 *
 */
public class PvToPo {
		
	public static void main(String [] args) throws Exception{
		
		TopologyBuilder builder =new TopologyBuilder();
		
		//消息队列发射的tuple不会重复
		builder.setSpout("spout", new SourceSpout(),1);
		builder.setBolt("bolt", new PVBolt(),4).shuffleGrouping("spout");
		
		//设置参数
		Map conf = new HashMap();
		
		if (args.length > 0) {
			//分布式提交
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		}else{
			//本地模式提交
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.createTopology());
		}
		
		
	}
}

  

Kill 作业:

storm kill PvTopo

 

提交topo:

storm jar ./starter.jar visits.PvTopo PvTopo

 

Storm之网站PV统计利用Zookeeper锁控制线程操作

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5076658.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!