一.准备工作
1.1 搭建hadoop分布式系统,博主是用3台虚拟机搭建的一个简易hadoop分布式系统。
linux 5.5 64位 hadoop2.6.0
192.168.19.201 h1 (master)
192.168.19.202 h2 (slaver1)
192.168.19.203 h3 (slaver2)
1.2 准备网站访问IP文件
由于是实验,一个简单的txt文件即可
如:vim a.txt
10.0.0.1
10.0.0.2
10.0.0.3
10.0.0.2
10.0.0.5
10.0.0.1
10.0.0.5
10.0.0.1
将数据放到hdfs中
hadoop fs -put a.txt /user
1.3 准备JAVA编译器,导入需要的hadoopJAR包,不赘述
二.mepreduce
2.1 首先需要理解IP,PV,UV的关系
简单来说,PV就是网站点击率,相同IP点击10次,计算数为10;
UV可以理解为访问客户,同样的IP一天内无论登陆多少次,进计算1次。
2.2 java代码
package com.mapreduce.pvuv;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class IpUv {
	
	
	public static class Map1 extends Mapper<LongWritable, Text, Text, Text>{
		public static Text line=new Text();
		
		@Override
		public void map(LongWritable longWritable,Text text,Context context) throws IOException, InterruptedException{     //Mapper中的Map函数实现
			line=text;
			context.write(line, new Text("1"));
			
		}
		
	}
	
	public static class Reduce1 extends Reducer<Text, Text, Text, Text>{
		@Override
		public void reduce(Text text,Iterable<Text> values,Context context) throws IOException, InterruptedException{
			context.write(text,new Text("1"));
		}
	}
		
		
		
	public static class Map2 extends Mapper<LongWritable, Text, Text, Text>{
			public static Text line=new Text();
			
			@Override
			public void map(LongWritable longWritable,Text text,Context context) throws IOException, InterruptedException{     //Mapper中的Map函数实现
				line=text;
				context.write(new Text("uv"), new Text("1"));
				
			}
			
		}
	
	public static class Reduce2 extends Reducer<Text, Text, Text, Text>{
		@Override
		public void reduce(Text text,Iterable<Text> values,Context context) throws IOException, InterruptedException{
			long sum=0;
			for(Text val:values){
				sum++;
			}
			context.write(text,new Text(String.valueOf(sum)));
			
		}
	}
	@SuppressWarnings("deprecation")
	
	
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		if(args.length < 2){
            System.out.println("args not right!");
            return ;
        }
		Configuration conf=new Configuration();
		conf.set("mapred.job.tracker", "192.168.19.201:9001");
		String inputDir = args[0];
        Path outputDir =new Path(args[1]);
        Job job1 = new Job(conf, "ipuv1");
        job1.setJarByClass(IpUv.class);
        job1.setMapperClass(Map1.class);
    	job1.setReducerClass(Reduce1.class);
    	job1.setOutputKeyClass(Text.class);
    	job1.setOutputValueClass(Text.class);
    	FileInputFormat.setInputPaths(job1,inputDir);
    	FileOutputFormat.setOutputPath(job1, outputDir);
    	boolean flag = job1.waitForCompletion(true);
    	if(flag){
    		Job job2 = new Job(conf, "ipuv2");
    		job2.setJarByClass(IpUv.class);
            job2.setMapperClass(Map2.class);
        	job2.setReducerClass(Reduce2.class);
        	job2.setOutputKeyClass(Text.class);
        	job2.setOutputValueClass(Text.class);
        	FileInputFormat.setInputPaths(job2,outputDir);
        	FileOutputFormat.setOutputPath(job2,new Path(outputDir + "-2"));
        	job2.waitForCompletion(true);
        	
    	}
        
		
		
	}
}hadoop计算pv、uv的方法不止一种,这里博主用的是最简单易懂的方法
2.3 运行
打包成jar包,放入主机Linux任意目录下
hadoopjar /home/hadoop/ipuv.jar com.mapreduce.pvuv.IpUv /user /output
查看运行效果
hadoop dfs -cat /output-2/part-r-00000
uv 4
hadoop分布式系统下的mapreduce java小程序计算网站uv
原文地址:http://10425580.blog.51cto.com/10415580/1680356