标签:流量统计 流量 line super tty nts 求和 构造 image
根据手机号码,查询该号码的上行,下行,总流量,并从高到低排序,并对手机号码根据省份分组
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
这次我们使用MapReducer进行同一号码的上下行流量统计
首先是需要定义自己的数据结果,需要我们定义的数据实现Writable 接口,实现 序列化和反序列化的函数,这样MapReduer在数据传递过程中,才不会报错,也不会丢失数据
FlowBean.java
package cn.itcast.hadoop.mr.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * FlowBean 是我们自定义的数据类型,要在hadoop的各个节点之间传输,应遵循hadoop的序列化机制 * 就必须实现hadoop相应的序列化接口 * */ public class FlowBean implements Writable { private String phoneNB; private long up_flow; private long d_flow; private long s_flow; // 在反序列化时,反射机制需要调用空参构造函数 public FlowBean(){} public FlowBean(String phoneNB, long up_flow, long d_flow) { super(); this.phoneNB = phoneNB; this.up_flow = up_flow; this.d_flow = d_flow; } public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getUp_flow() { return up_flow; } public void setUp_flow(long up_flow) { this.up_flow = up_flow; } public long getD_flow() { return d_flow; } public void setD_flow(long d_flow) { this.d_flow = d_flow; } /** * @return the s_flow */ public long getS_flow() { return s_flow; } /** * @param s_flow the s_flow to set */ public void setS_flow(long s_flow) { this.s_flow = s_flow; } @Override public String toString() { return ""+ phoneNB + "\t" + up_flow + "\t" + d_flow; } // 将对象数据序列化到流中 @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNB); out.writeLong(up_flow); out.writeLong(d_flow); out.writeLong(s_flow); } // 从数据流中泛序列出对象的数据 // 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致 @Override public void readFields(DataInput in) throws IOException { phoneNB = in.readUTF(); up_flow = in.readLong(); d_flow = in.readLong(); s_flow = in.readLong(); } }
为了节省空间,就把map和reducer的类定义成了静态类
package cn.itcast.hadoop.mr.flowsum; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; public class FlowSumRunner extends Configured implements Tool { public static class FlowSumMapper extends Mapper<LongWritable, Text, Text,FlowBean> { //拿到日志中的数据,切分各个字段 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ //拿一行数据 String line = value.toString(); String[] fileds = StringUtils.split(line, "\t"); String phoneNB = fileds[1]; long up_flow = Long.parseLong(fileds[7]); long d_flow = Long.parseLong(fileds[8]); //封装数据为kv并输出 context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow, d_flow)); } } public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
//框架每传递一组数据<1387788654,{flowbean,flowbean,flowbean,flowbean.....}>调用一次我们的reduce方法
//reduce中的业务逻辑就是遍历values,然后进行累加求和再输出
@Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long up_flow_counter = 0; long d_flow_counter= 0; for (FlowBean value : values) { up_flow_counter += value.getUp_flow(); d_flow_counter += value.getD_flow(); } context.write(key, new FlowBean(key.toString(),up_flow_counter, d_flow_counter)); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumRunner.class); job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); //读取命令行参数 FileOutputFormat.setOutputPath(job, new Path(args[1]));//读取命令行参数 return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ //hadoop jar flow.jar cn.itcast.hadoop.mr.flowsum.FlowSumRunner /flow/data /flow/output int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args); System.exit(res); } }
如果不实用MapReducer默认的排序方式,使用自定义的方式,FlowBean需要实现Comparable接口,compareTo这个函数 Writable和Comparable合在一起 就是 WritableComparable接口
FlowBean.class
package cn.itcast.hadoop.mr.flowsum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class FlowBean implements WritableComparable<FlowBean> {
...
...
...
@Override
public int compareTo(FlowBean o) {
return this.getUp_flow() > o.getUp_flow() ? -1:1;
}
}//如果对象本身的id大于传入的对象id,
//返回值是正数,就是升序排序
//返回值是负数,就是降序排序
因为MapReduce默认是索引 也就是 Key排序,所以我们也要相应的改一下Map和Reduer
package cn.itcast.hadoop.mr.flowsort; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import cn.itcast.hadoop.mr.flowsum.FlowBean; public class SortMR { // NullWritable 如果什么也不想输出的话 使用NumWritable public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException{ String line = value.toString(); String[] fields = StringUtils.split(line,"\t"); String phoneNB = fields[0]; long up_flow = Long.parseLong(fields[1]); long d_flow = Long.parseLong(fields[2]); context.write(new FlowBean(phoneNB, up_flow, d_flow), NullWritable.get()); } } public static class SortReducer extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable> { @Override protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SortMR.class); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); int result = job.waitForCompletion(true)?0:1; System.exit(result); } }
输出结果:
13600217502 186852 200 13560439658 4938 200 84138413 4116 1432 13922314466 3008 3720 13726230503 2481 24681 13926435656 1512 200 13560439658 954 200 13480253104 180 200 13823070001 180 200 13760778710 120 200 13502468823 102 7335 13925057413 63 11058 15013685858 27 3659 15920133257 20 3156 18320173382 18 9531 18211575961 12 1527 13602846565 12 1938 13660577991 9 6960 15989002119 3 1938 13719199419 0 200 13826544101 0 200 13926251106 0 200
截止到现在 我们看到的输出结果一直都只有一个文件
我们现在想 把 135,136,137,138,139和其他的号码区分开
这个时候实际就是把这不同部分的数据交给不同的Reducer去处理,然后各个Reducer输出各自的结果 把map的结果分发给不同的Reducer,需要进行一个分组的动作,hadoop默认都是一个组,所以默认只有一个Reducer,输出的也就只有一个文件了
现在我们实现Partitioner 这个类,可以按照我们自己的需求把数据进行分组
package cn.itcast.hadoop.mr.areapartition; import java.util.HashMap; import org.apache.hadoop.mapreduce.Partitioner; public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> { private static HashMap<String, Integer> areaMap = new HashMap<>(); static { //暂且先静态写死分组 areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { // 从key中拿到手机号,查询手机归属地字典, 不同省份返回不同的编组 Integer code = areaMap.get(key.toString().substring(0,3)); int areaCoder = code == null ? 5:code; return areaCoder; } }
然后需要在 job运行之前指定 ParrtitionerClass 类
job.setPartitionerClass(AreaPartitioner.class); job.setNumReduceTasks(6); //然后设置启动Reducer的数量 // 这里需要注意的是 这里的数量必须大于等于 你设置的数据分组的数量 不然会进行报错 // 如果不设置 默认为1 就不会分组, 所有数据就只有一个文件 // 如果设置的多了 多的文件里面不会有数据
结果:
hadoop jar flowarea.jar cn.itcast.hadoop.mr.areapartition.FlowSumArea /flow/data /flow/areaoutput [hadoop@hadoop1 ~]$ hadoop fs -ls /flow/areaoutput Found 7 items -rw-r--r-- 1 hadoop supergroup 0 2018-03-18 13:57 /flow/areaoutput/_SUCCESS -rw-r--r-- 1 hadoop supergroup 66 2018-03-18 13:56 /flow/areaoutput/part-r-00000 -rw-r--r-- 1 hadoop supergroup 98 2018-03-18 13:56 /flow/areaoutput/part-r-00001 -rw-r--r-- 1 hadoop supergroup 97 2018-03-18 13:57 /flow/areaoutput/part-r-00002 -rw-r--r-- 1 hadoop supergroup 62 2018-03-18 13:57 /flow/areaoutput/part-r-00003 -rw-r--r-- 1 hadoop supergroup 130 2018-03-18 13:57 /flow/areaoutput/part-r-00004 -rw-r--r-- 1 hadoop supergroup 219 2018-03-18 13:57 /flow/areaoutput/part-r-00005 [hadoop@hadoop1 ~]$ hadoop fs -cat /flow/areaoutput/part-r-00000 13502468823 13502468823 102 7335 13560439658 13560439658 5892 400
shuffle的主要工作是从Map结束到Reduce开始之间的过程。
一、Map端的shuffle
Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill。
在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。partition的目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据。接着运行combiner(如果设置了的话),combiner的本质也是一个Reducer,其目的是对将要写入到磁盘上的文件先进行一次处理,这样,写入到磁盘的数据量就会减少。最后将数据写到本地磁盘产生spill文件(spill文件保存在{mapred.local.dir}指定的目录中,Map任务结束后就会被删除)。
最后,每个Map任务可能产生多个spill文件,在每个Map任务完成前,会通过多路归并算法将这些spill文件归并成一个文件。至此,Map的shuffle过程就结束了。
二、Reduce端的shuffle
Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。
首先要将Map端产生的输出文件拷贝到Reduce端,但每个Reducer如何知道自己应该处理哪些数据呢?因为Map端进行partition的时候,实际上就相当于指定了每个Reducer要处理的数据(partition就对应了Reducer),所以Reducer在拷贝数据的时候只需拷贝与自己对应的partition中的数据即可。每个Reducer会处理一个或者多个partition,但需要先将自己对应的partition中的数据从每个Map的输出结果中拷贝过来。
接下来就是sort阶段,也成为merge阶段,因为这个阶段的主要工作是执行了归并排序。从Map端拷贝到Reduce端的数据都是有序的,所以很适合归并排序。最终在Reduce端生成一个较大的文件作为Reduce的输入。
最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上。
现在来总结一下shuffle过程,我画了张图,希望能够帮助理解。
block:block是物理切块,在文件上传到HDFS文件系统后,对大文将以每128MB的大小切分若干,存放在不同的DataNode上;
2、split:split是逻辑切片,在mapreduce中的map task开始之前,将文件按照指定的大小切割成若干个部分,每一部分称为一个split,默认是split的大小与block的大小相等,均为128MB。
注意:在hadoop1.x版本中,block默认的大小为64MB,在hadoop2.x版本修改成了128MB。
MapReducer 自定义bean-排序-分组和shuffle的过程
标签:流量统计 流量 line super tty nts 求和 构造 image
原文地址:https://www.cnblogs.com/zemul/p/10821235.html