标签:进程 except src mamicode text string end ado 一般来说
1)什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
2)为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能 由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的” 对象,可以将“活的”对象发送到远程计算机。
3)为什么不用 Java 的序列化
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带 很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以, Hadoop 自己开发了一套序列化机制(Writable)。
4)Hadoop 序列化特点:
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在 Hadoop 框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
具体实现 bean 对象序列化步骤如下 7 步。
1)需求
统计每一个手机号耗费的总上行流量、总下行流量、总流量
输入数据格式:
7 | 13560436666 | 120.196.100.99 | 1116 | 954 | 200 |
---|---|---|---|---|---|
id | 手机号码 | 网络IP | 上行流量 | 下行流量 | 网络状态码 |
期望输出数据格式:
13560436666 | 1116 | 954 | 2070 |
---|---|---|---|
手机号码 | 上行流量 | 下行流量 | 总流量 |
2)需求分析
需求:统计每一个手机号耗费的总上行流量、下行流量、总流量
输入数据格式
7 | 13560436666 | 120.196.100.99 | 1116 | 954 | 200 |
---|---|---|---|---|---|
id | 手机号码 | 网络IP | 上行流量 | 下行流量 | 网络状态码 |
期望输出数据格式
13560436666 | 1116 | 954 | 2070 |
---|---|---|---|
手机号码 | 上行流量 | 下行流量 | 总流量 |
Map阶段
读取一行数据,切分字段
7 13560436666 120.196.100.99 1116 954 200
抽取手机号、上行流量、下行流量
13560436666 | 1116 | 954 |
---|---|---|
手机号码 | 上行流量 | 下行流量 |
以手机号为key,bean对象为value输出,即context.write(手机号, bean);
bean对象要想能够传输,必须实现序列化接口
Reduce阶段:累加上行流量和下行流量得到总流量。
13560436666 | 1116 | + | 954 | = | 2070 |
---|---|---|---|---|---|
手机号码 | 上行流量 | 下行流量 | 总流量 |
3)编写 MapReduce 程序
(1)编写流量统计的 Bean 对象
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
/**
* 上行流量
*/
private long upFlow;
/**
* 下行流量
*/
private long downFlow;
/**
* 总流量
*/
private long sumFlow;
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
(2)编写 Mapper 类
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取1行
// 2 13846544121 192.196.100.2 264 0 200
String line = value.toString();
// 2 切割
// [1, 13736230513, 192.196.100.1, www.atguigu.com, 2481, 24681, 200]
// [2, 13846544121, 192.196.100.2, , , 264, 0, 200]
String[] split = line.split("\t");
// 3 抓取想要的数据
// 手机号:13736230513
// 上行流量:2481
// 下行流量:24681
String phone = split[1];
String up = split[split.length - 3];
String down = split[split.length - 2];
// 4 封装
outK.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
// 5 写出
context.write(outK, outV);
}
}
(3)编写 Reducer 类
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
// 1 遍历集合累加值
long totalUp = 0;
long totalDown = 0;
for (FlowBean value : values) {
totalUp += value.getUpFlow();
totalDown += value.getDownFlow();
}
// 2 封装outK, outV
outV.setUpFlow(totalUp);
outV.setDownFlow(totalDown);
outV.setSumFlow();
// 3 写出
context.write(key, outV);
}
}
(4)编写 Driver 驱动类
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar
job.setJarByClass(com.atguigu.mapreduce.writable.FlowDriver.class);
// 3 关联Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4 设置mapper 输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 设置数据的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputflow"));
FileOutputFormat.setOutputPath(job, new Path("D:\\input\\output3"));
// 7 提交job
boolean result = job.waitForCompletion(Boolean.TRUE);
System.exit(result ? 0 : 1);
}
}
标签:进程 except src mamicode text string end ado 一般来说
原文地址:https://www.cnblogs.com/iamfatotaku/p/14616809.html