一.准备工作
1.1 搭建hadoop分布式系统,博主是用3台虚拟机搭建的一个简易hadoop分布式系统。
linux 5.5 64位 hadoop2.6.0
192.168.19.201 h1 (master)
192.168.19.202 h2 (slaver1)
192.168.19.203 h3 (slaver2)
1.2 准备网站访问IP文件
由于是实验,一个简单的txt文件即可
如:vim a.txt
10.0.0.1
10.0.0.2
10.0.0.3
10.0.0.2
10.0.0.5
10.0.0.1
10.0.0.5
10.0.0.1
将数据放到hdfs中
hadoop fs -put a.txt /user
1.3 准备JAVA编译器,导入需要的hadoopJAR包,不赘述
二.mepreduce
2.1 首先需要理解IP,PV,UV的关系
简单来说,PV就是网站点击率,相同IP点击10次,计算数为10;
UV可以理解为访问客户,同样的IP一天内无论登陆多少次,进计算1次。
2.2 java代码
package com.mapreduce.pvuv; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class IpUv { public static class Map1 extends Mapper<LongWritable, Text, Text, Text>{ public static Text line=new Text(); @Override public void map(LongWritable longWritable,Text text,Context context) throws IOException, InterruptedException{ //Mapper中的Map函数实现 line=text; context.write(line, new Text("1")); } } public static class Reduce1 extends Reducer<Text, Text, Text, Text>{ @Override public void reduce(Text text,Iterable<Text> values,Context context) throws IOException, InterruptedException{ context.write(text,new Text("1")); } } public static class Map2 extends Mapper<LongWritable, Text, Text, Text>{ public static Text line=new Text(); @Override public void map(LongWritable longWritable,Text text,Context context) throws IOException, InterruptedException{ //Mapper中的Map函数实现 line=text; context.write(new Text("uv"), new Text("1")); } } public static class Reduce2 extends Reducer<Text, Text, Text, Text>{ @Override public void reduce(Text text,Iterable<Text> values,Context context) throws IOException, InterruptedException{ long sum=0; for(Text val:values){ sum++; } context.write(text,new Text(String.valueOf(sum))); } } @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { // TODO Auto-generated method stub if(args.length < 2){ System.out.println("args not right!"); return ; } Configuration conf=new Configuration(); conf.set("mapred.job.tracker", "192.168.19.201:9001"); String inputDir = args[0]; Path outputDir =new Path(args[1]); Job job1 = new Job(conf, "ipuv1"); job1.setJarByClass(IpUv.class); job1.setMapperClass(Map1.class); job1.setReducerClass(Reduce1.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job1,inputDir); FileOutputFormat.setOutputPath(job1, outputDir); boolean flag = job1.waitForCompletion(true); if(flag){ Job job2 = new Job(conf, "ipuv2"); job2.setJarByClass(IpUv.class); job2.setMapperClass(Map2.class); job2.setReducerClass(Reduce2.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job2,outputDir); FileOutputFormat.setOutputPath(job2,new Path(outputDir + "-2")); job2.waitForCompletion(true); } } }
hadoop计算pv、uv的方法不止一种,这里博主用的是最简单易懂的方法
2.3 运行
打包成jar包,放入主机Linux任意目录下
hadoopjar /home/hadoop/ipuv.jar com.mapreduce.pvuv.IpUv /user /output
查看运行效果
hadoop dfs -cat /output-2/part-r-00000
uv 4
hadoop分布式系统下的mapreduce java小程序计算网站uv
原文地址:http://10425580.blog.51cto.com/10415580/1680356