标签:
本篇博客将结合手机上网流量业务来详细介绍Hadoop的二次排序机制、分区机制,先介绍一下业务场景:
先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和。
本次描述所用数据:
日志格式描述:
日志flowdata.txt中的具体数据:
首先我们先通过mapreduce程序实现上面的业务逻辑:
代码实现:
package FlowSum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
//本程序的目的是统计每个用户的上行流量和,下行流量和,以及总流量和:用到的知识点是hadoop中自定义数据类型(序列化与反序列化机制)
public class MsisdnFlowSum
{
public static String path1 = "file:///C:\\flowdata.txt";
public static String path2 = "file:///C:\\flowdir\\";
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(new Path(path2)))
{
fileSystem.delete(new Path(path2), true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(MsisdnFlowSum.class);
FileInputFormat.setInputPaths(job, new Path(path1));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowType.class);
job.setNumReduceTasks(1);
job.setPartitionerClass(HashPartitioner.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowType.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(path2));
job.waitForCompletion(true);
//查看运行结果:
FSDataInputStream fr = fileSystem.open(new Path("file:///C:\\flowdir\\part-r-00000"));
IOUtils.copyBytes(fr, System.out, 1024, true);
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, FlowType>
{
protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
{
//拿到日志中的一行数据
String line = v1.toString();
//切分各个字段
String[] splited = line.split("\t");
//获取我们所需要的字段:手机号、上行流量、下行流量
String msisdn = splited[1];
long upPayLoad = Long.parseLong(splited[8]);
long downPayLoad = Long.parseLong(splited[9]);
//将数据进行输出
context.write(new Text(msisdn), new FlowType(upPayLoad,downPayLoad));
}
}
public static class MyReducer extends Reducer<Text, FlowType, Text, FlowType>
{
protected void reduce(Text k2, Iterable<FlowType> v2s,Context context)throws IOException, InterruptedException
{
long payLoadSum = 0L; //计算每个用户的上行流量和
long downLoadSum = 0L; //统计每个用户的下行流量和
//数据传递过来的时候:<手机号,{FlowType1,FlowType2,FlowType3……}>
for (FlowType v2 : v2s)
{
payLoadSum += v2.upPayLoad;
downLoadSum += v2.downPayLoad;
}
context.write(k2, new FlowType(payLoadSum,downLoadSum)); //在此需要重写toString()方法
}
}
}
class FlowType implements Writable
{
public long upPayLoad;//上行流量
public long downPayLoad;//下行流量
public long loadSum; //总流量
public FlowType(){}
public FlowType(long upPayLoad,long downPayLoad)
{
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.loadSum = upPayLoad + downPayLoad;//利用构造函数的技巧,创建构造函数时,总流量被自动求出
}
//只要数据在网络中进行传输,就需要序列化与反序列化
//先序列化,将对象(字段)写到字节输出流当中
public void write(DataOutput fw) throws IOException
{
fw.writeLong(upPayLoad);
fw.writeLong(downPayLoad);
}
//反序列化,将对象从字节输入流当中读取出来,并且序列化与反序列化的字段顺序要相同
public void readFields(DataInput fr) throws IOException
{
this.upPayLoad = fr.readLong();//将上行流量给反序列化出来
this.downPayLoad = fr.readLong(); //将下行流量给反序列化出来
}
public String toString()
{
return "" + this.upPayLoad+"\t"+this.downPayLoad+"\t"+this.loadSum;
}
}
运行结果:
13480253104 180 180 360
13502468823 7335 110349 117684
13560439658 2034 5892 7926
13600217502 1080 186852 187932
13602846565 1938 2910 4848
13660577991 6960 690 7650
13719199419 240 0 240
13726230503 2481 24681 27162
13760778710 120 120 240
13823070001 360 180 540
13826544101 264 0 264
13922314466 3008 3720 6728
13925057413 11058 48243 59301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 3659 3538 7197
15920133257 3156 2936 6092
15989002119 1938 180 2118
18211575961 1527 2106 3633
18320173382 9531 2412 11943
84138413 4116 1432 5548
接下来我们将针对上面的结果进行二次排序,在实际的业务当中,二次排序总是基于上面的结果进行排序的,具体实现代码:
二次排序的标准:
1、先参考用户总流量,按照升序进行排序
2、总流量相同的情况下,参考下行流量,按照降序进行排序
3、下行流量相同的情况下,参考上行流量,按照升序进行排序
package FlowSum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
//本程序的目的是针对流量统计结果进行二次排序
/*
二次排序的方式:
1、先参考用户总流量,按照升序进行排序
2、总流量相同的情况下,参考下行流量,按照降序进行排序
3、下行流量相同的情况下,参考上行流量,按照升序进行排序
*/
public class FlowSort
{
public static String path1 = "file:///C:\\result1.txt";
public static String path2 = "file:///C:\\sortdir";
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(new Path(path2)))
{
fileSystem.delete(new Path(path2), true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSort.class);
FileInputFormat.setInputPaths(job, new Path(path1));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(SortType.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(1);
job.setPartitionerClass(HashPartitioner.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(SortType.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(path2));
job.waitForCompletion(true);
//查询排序结果:
FSDataInputStream fr = fileSystem.open(new Path("file:///C:\\sortdir\\part-r-00000"));
IOUtils.copyBytes(fr, System.out, 1024, true);
}
public static class MyMapper extends Mapper<LongWritable, Text, SortType, NullWritable>
{
protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
{
//拿到日志中的一行数据
String line = v1.toString();
//切分各个字段
String[] splited = line.split("\t");
//获取我们所需要的字段:手机号、上行流量、下行流量、总流量
String msisdn = splited[0];
long upPayLoad = Long.parseLong(splited[1]);
long downPayLoad = Long.parseLong(splited[2]);
long loadSum = Long.parseLong(splited[3]);
context.write(new SortType(msisdn,upPayLoad,downPayLoad,loadSum), NullWritable.get());
}
}
public static class MyReducer extends Reducer<SortType, NullWritable, SortType, NullWritable>
{
protected void reduce(SortType k2, Iterable<NullWritable> v2s,Context context)throws IOException, InterruptedException
{
context.write(k2, NullWritable.get());
}
}
}
@SuppressWarnings("rawtypes")
class SortType implements WritableComparable
{
public String msisdn; //用户手机号
public long upPayLoad;//上行流量
public long downPayLoad;//下行流量
public long loadSum; //总流量
public SortType(){}
public SortType(String msisdn, long upPayLoad, long downPayLoad,long loadSum)
{
this.msisdn = msisdn;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.loadSum = loadSum;
}
public void write(DataOutput fw) throws IOException
{
fw.writeUTF(msisdn);
fw.writeLong(upPayLoad);
fw.writeLong(downPayLoad);
fw.writeLong(loadSum);
}
public void readFields(DataInput fr) throws IOException
{
this.msisdn = fr.readUTF();
this.upPayLoad = fr.readLong();
this.downPayLoad = fr.readLong();
this.loadSum = fr.readLong();
}
public int compareTo(Object obj) //重写compareTo方法,指定排序标准
{
SortType cc = (SortType)obj;
if(this.loadSum!=cc.loadSum)
return (int) (this.loadSum - cc.loadSum);//loadSum升序排列
else if(this.loadSum==cc.loadSum)
{
return (int) (cc.downPayLoad -this.downPayLoad );//downPayLoad降序排列
}
else
return (int) (this.upPayLoad -cc.upPayLoad );//upPayLoad升序排列
}
public String toString()
{
return "" + this.msisdn+"\t"+this.upPayLoad+"\t"+this.downPayLoad+"\t"+this.loadSum;
}
}
运行日式与结果:
Map-Reduce Framework
Map input records=21
Map output records=21
Map output bytes=774
Map output materialized bytes=822
Input split bytes=85
Combine input records=0
Combine output records=0
Reduce input groups=20
Reduce shuffle bytes=822
Reduce input records=21
Reduce output records=20
Spilled Records=42
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=535822336
13760778710 120 120 240
13926251106 240 0 240
13826544101 264 0 264
13480253104 180 180 360
13823070001 360 180 540
13926435656 132 1512 1644
15989002119 1938 180 2118
18211575961 1527 2106 3633
13602846565 1938 2910 4848
84138413 4116 1432 5548
15920133257 3156 2936 6092
13922314466 3008 3720 6728
15013685858 3659 3538 7197
13660577991 6960 690 7650
13560439658 2034 5892 7926
18320173382 9531 2412 11943
13726230503 2481 24681 27162
13925057413 11058 48243 59301
13502468823 7335 110349 117684
13600217502 1080 186852 187932
从上面的运行日志上面我们可以看出,mapreduce读取进来的数据是21条,但是输出的数据是20条,很明显,少了1条数据,针对这个问题,修改措施如下:
将:
protected void reduce(SortType k2, Iterable<NullWritable> v2s,Context context)throws IOException, InterruptedException
{
context.write(k2, NullWritable.get());
}
修改为:
protected void reduce(SortType k2, Iterable<NullWritable> v2s,Context context)throws IOException, InterruptedException
{
for (NullWritable v2 : v2s)
{
context.write(k2, v2);
}
}
修改完代码后,在次运行程序,相应日志和结果如下:
Map-Reduce Framework
Map input records=21
Map output records=21
Map output bytes=774
Map output materialized bytes=822
Input split bytes=85
Combine input records=0
Combine output records=0
Reduce input groups=20
Reduce shuffle bytes=822
Reduce input records=21
Reduce output records=21
Spilled Records=42
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=11
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=467664896
13760778710 120 120 240
13926251106 240 0 240
13719199419 240 0 240
13826544101 264 0 264
13480253104 180 180 360
13823070001 360 180 540
13926435656 132 1512 1644
15989002119 1938 180 2118
18211575961 1527 2106 3633
13602846565 1938 2910 4848
84138413 4116 1432 5548
15920133257 3156 2936 6092
13922314466 3008 3720 6728
15013685858 3659 3538 7197
13660577991 6960 690 7650
13560439658 2034 5892 7926
18320173382 9531 2412 11943
13726230503 2481 24681 27162
13925057413 11058 48243 59301
13502468823 7335 110349 117684
13600217502 1080 186852 187932
从结果可以看出,二次排序的结果达到了我们的预期要求。
接下来我们进行分区:将手机号的流量统计结果输出到一个文件中,将非手机号(电话号)的流量统计结果输出到一个文件中。
实现代码:
package FlowSum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
//本程序的目的是针对流量统计结果进行二次排序和分区
/*
二次排序的方式:
1、先参考用户总流量,按照升序进行排序
2、总流量相同的情况下,参考下行流量,按照降序进行排序
3、下行流量相同的情况下,参考上行流量,按照升序进行排序
*/
public class FlowSort
{
public static String path1 = "file:///C:\\result1.txt";
public static String path2 = "file:///C:\\sortdir";
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(new Path(path2)))
{
fileSystem.delete(new Path(path2), true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSort.class);
FileInputFormat.setInputPaths(job, new Path(path1));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(SortType.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(2);
job.setPartitionerClass(MyPartitioner.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(SortType.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(path2));
job.waitForCompletion(true);
}
public static class MyMapper extends Mapper<LongWritable, Text, SortType, NullWritable>
{
protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
{
//拿到日志中的一行数据
String line = v1.toString();
//切分各个字段
String[] splited = line.split("\t");
//获取我们所需要的字段:手机号、上行流量、下行流量、总流量
String msisdn = splited[0];
long upPayLoad = Long.parseLong(splited[1]);
long downPayLoad = Long.parseLong(splited[2]);
long loadSum = Long.parseLong(splited[3]);
context.write(new SortType(msisdn,upPayLoad,downPayLoad,loadSum), NullWritable.get());
}
}
public static class MyPartitioner extends Partitioner<SortType, NullWritable>
{
public int getPartition(SortType k2, NullWritable v2, int num)
{
String tele = k2.msisdn;//获取到相应的手机号
if(tele.length()==11)
return 0; //手机号的输出到0区
else
return 1;//非手机号输出到1区
}
}
public static class MyReducer extends Reducer<SortType, NullWritable, SortType, NullWritable>
{
protected void reduce(SortType k2, Iterable<NullWritable> v2s,Context context)throws IOException, InterruptedException
{
for (NullWritable v2 : v2s)
{
context.write(k2, v2);
}
}
}
}
@SuppressWarnings("rawtypes")
class SortType implements WritableComparable
{
public String msisdn; //用户手机号
public long upPayLoad;//上行流量
public long downPayLoad;//下行流量
public long loadSum; //总流量
public SortType(){}
public SortType(String msisdn, long upPayLoad, long downPayLoad,long loadSum)
{
this.msisdn = msisdn;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.loadSum = loadSum;
}
public void write(DataOutput fw) throws IOException
{
fw.writeUTF(msisdn);
fw.writeLong(upPayLoad);
fw.writeLong(downPayLoad);
fw.writeLong(loadSum);
}
public void readFields(DataInput fr) throws IOException
{
this.msisdn = fr.readUTF();
this.upPayLoad = fr.readLong();
this.downPayLoad = fr.readLong();
this.loadSum = fr.readLong();
}
public int compareTo(Object obj) //重写compareTo方法,指定排序标准
{
SortType cc = (SortType)obj;
if(this.loadSum!=cc.loadSum)
return (int) (this.loadSum - cc.loadSum);//loadSum升序排列
else if(this.loadSum==cc.loadSum)
{
return (int) (cc.downPayLoad -this.downPayLoad );//downPayLoad降序排列
}
else
return (int) (this.upPayLoad -cc.upPayLoad );//upPayLoad升序排列
}
public String toString()
{
return "" + this.msisdn+"\t"+this.upPayLoad+"\t"+this.downPayLoad+"\t"+this.loadSum;
}
}
查看运行结果:
part-r-00000中的内容为:
0区的结果是:
13760778710 120 120 240
13926251106 240 0 240
13719199419 240 0 240
13826544101 264 0 264
13480253104 180 180 360
13823070001 360 180 540
13926435656 132 1512 1644
15989002119 1938 180 2118
18211575961 1527 2106 3633
13602846565 1938 2910 4848
15920133257 3156 2936 6092
13922314466 3008 3720 6728
15013685858 3659 3538 7197
13660577991 6960 690 7650
13560439658 2034 5892 7926
18320173382 9531 2412 11943
13726230503 2481 24681 27162
13925057413 11058 48243 59301
13502468823 7335 110349 117684
13600217502 1080 186852 187932
part-r-00001中的内容为:
1区的结果是:
84138413 4116 1432 5548
上面是在hadoop的本地运行模式下进行调试的结果,我们接下来到hadoop的集群模式进行测试:
修改部分代码:
public static String path1 = "";
public static String path2 = "";
public static void main(String[] args) throws Exception
{
path1 = args[0];
path2 = args[1];
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(new Path(path2)))
{
fileSystem.delete(new Path(path2), true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSort.class);
FileInputFormat.setInputPaths(job, new Path(path1));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(SortType.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(2);
job.setPartitionerClass(MyPartitioner.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(SortType.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(path2));
job.waitForCompletion(true);
}
jar包方式运行:
[root@hadoop11 local]# hadoop jar Flow.jar /result1.txt /resdir
查看运行结果:
[root@hadoop11 local]# hadoop fs -lsr /resdir/
lsr: DEPRECATED: Please use ‘ls -R‘ instead.
-rw-r--r-- 1 root supergroup 0 2016-07-08 10:19 /resdir/_SUCCESS
-rw-r--r-- 1 root supergroup 527 2016-07-08 10:19 /resdir/part-r-00000
-rw-r--r-- 1 root supergroup 24 2016-07-08 10:19 /resdir/part-r-00001
[root@hadoop11 local]# hadoop fs -cat /resdir/part-r-00000
13760778710 120 120 240
13926251106 240 0 240
13719199419 240 0 240
13826544101 264 0 264
13480253104 180 180 360
13823070001 360 180 540
13926435656 132 1512 1644
15989002119 1938 180 2118
18211575961 1527 2106 3633
13602846565 1938 2910 4848
15920133257 3156 2936 6092
13922314466 3008 3720 6728
15013685858 3659 3538 7197
13660577991 6960 690 7650
13560439658 2034 5892 7926
18320173382 9531 2412 11943
13726230503 2481 24681 27162
13925057413 11058 48243 59301
13502468823 7335 110349 117684
13600217502 1080 186852 187932
[root@hadoop11 local]# hadoop fs -cat /resdir/part-r-00001
84138413 4116 1432 5548
上面就是我们在集群中运行的结果,和期望的是一样的!
如有问题,欢迎留言!
结合手机上网流量业务来说明Hadoop中的二次排序机制,分区机制
标签:
原文地址:http://blog.csdn.net/a2011480169/article/details/51858756