一、MapReduce概述
MapReduce 是 Hadoop 的核心组成, 是专用于进行数据计算的,是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.
MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入输出信息。
map、reduce键值对格式
二、MapReduce体系结构及工作流程
1、JobTracker
负责接收用户提交的作业,负责启动、跟踪任务执行。
JobSubmissionProtocol是JobClient与JobTracker通信的接口。
InterTrackerProtocol是TaskTracker与JobTracker通信的接口。
2、TaskTracker
负责执行任务。
3、JobClient
是用户作业与JobTracker交互的主要接口。
负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。
4、工作流程图
执行步骤:
1.map任务处理
1.1读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
1.3 对输出的key、value进行分区。
1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
1.5 (可选)分组后的数据进行归约。
2.reduce任务处理
2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
2.3 把reduce的输出保存到文件中。
三、统计单词源代码及注释
package mapreduce;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountApp {
static final String INPUT_PATH = "hdfs://liguodong:9000/hello";
static final String OUT_PATH = "hdfs://liguodong:9000/out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
if(fileSystem.exists(new Path(OUT_PATH)))
{
fileSystem.delete(new Path(OUT_PATH),true);
}
final Job job = new Job(conf, WordCountApp.class.getSimpleName());
//1.1 输入目录在哪里
FileInputFormat.setInputPaths(job, INPUT_PATH);
//指定对输入数据进行格式化处理的类
//job.setInputFormatClass(TextInputFormat.class);
//1.2 指定自定义的Mapper类
job.setMapperClass(MyMapper.class);
//指定map的输出的<k,v>类型,如果<k3,v3>的类型与<k2,v2>的类型一致,那么可以省略。
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(LongWritable.class);
//1.3分区
//job.setPartitionerClass(HashPartitioner.class);
//job.setNumReduceTasks(1);
//1.4 TODO 排序、分组
//1.5 TODO (可选)归约
//2.2 指定自定义的Reducer类
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//2.3 指定输出的路径
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
//指定输出的格式化类
//job.setOutputFormatClass(TextOutputFormat.class);
//把作业交给JobTracker运行
job.waitForCompletion(true);
}
/**
* KEYIN 即K1 表示每一行的起始位置(偏移量offset)
* VALUEIN 即v1 表示每一行的文本内容
* KEYOUT 即k2 表示每一行中的每个单词
* VALUEOUT 即v2 表示每一行中的每个单词的出现次数,固定值1
* @author liguodong
* Java Hadoop
* Long LongWritable
* String Text
*/
static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
final String[] splited = value.toString().split(" ");
for (String word : splited) {
context.write(new Text(word), new LongWritable(1));
}
}
}
/**
* KEYIN 即k2 表示每一行中的每个单词
* VALUEIN 即v2 表示每一行中每个单词出现次数,固定值1
* KEYOUT 即k3 表示整个文件中的不同单词
* VALUEOUT 即v3 表示整个文件中的不同单词的出现总次数
* @author liguodong
*/
static class MyReducer extends Reducer<Text, LongWritable,Text, LongWritable>{
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
long sum = 0L;
for (LongWritable v2 : v2s) {
sum += v2.get();
}
context.write(k2, new LongWritable(sum));
}
}
}
四、最小的MapReduce驱动
Configuration configuration = new Configuration();
Job job = new Job(configuration, "HelloWorld");
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(IdentityMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
job.waitForCompletion(true);
五、MapReduce驱动默认的设置
六、Hadoop序列化与基本类型
1、序列化概念
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
Java序列化(java.io.Serializable)
2、Hadoop序列化格式特点
紧凑:高效使用存储空间。
快速:读写数据的额外开销小
可扩展:可透明地读取老格式的数据
互操作:支持多语言的交互
Hadoop的序列化格式:Writable
3、Hadoop序列化的作用
序列化在分布式环境的两大作用:进程间通信,永久存储。
Hadoop节点间通信。
4、基本数据类型
Hadoop的数据类型要求必须实现Writable接口。
Writable接口是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.
MR的任意Key和Value必须实现Writable接口。
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}
MR的任意key必须实现WritableComparable接口。
/**
* A {@link Writable} which is also {@link Comparable}.
*
* <p><code>WritableComparable</code>s can be compared to each other, typically
* via <code>Comparator</code>s. Any type which is to be used as a
* <code>key</code> in the Hadoop Map-Reduce framework should implement this
* interface.</p>
**/
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
实现Writable接口?
write 是把每个对象序列化到输出流
readFields 是把输入流字节反序列化
实现WritableComparable?
Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法。
Hadoop与Java常见基本类型的对照
Long—LongWritable
Integer—IntWritable
Boolean—BooleanWritable
String—Text
Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
例:
Text test = new Text(“test”);
IntWritable one = new IntWritable(1);
java类型如何转化为hadoop基本类型?
调用hadoop类型的构造方法,或者调用set()方法。
new LongWritable(123L);
hadoop基本类型如何转化为java类型?
对于Text,需要调用toString()方法,其他类型调用get()方法。
七、基于文件存储的数据结构
SequenceFile—无序存储
MapFile—会对key建立索引文件,value按key顺序存储
基于MapFile的结构有:
ArrayFile—与我们使用的数组一样,key值为序列化的数字
SetFile—只有key,value为不可变的数据
BloomMapFile—在 MapFile 的基础上增加了一个/bloom 文件,包含的是二进制的过滤表,在每一次写操作完成时,会更新这个过滤表。
八、自定义数据类型源码
package mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class KpiApp {
static final String INPUT_PATH = "hdfs://liguodong:9000/wlan";
static final String OUT_PATH = "hdfs://liguodong:9000/out";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
final Job job = new Job(new Configuration(),KpiApp.class.getSimpleName());
//1.1 指定输入文件路径
FileInputFormat.setInputPaths(job, INPUT_PATH);
//指定那个类用来格式化输入文件
job.setInputFormatClass(TextInputFormat.class);
//1.2 指定自定义的Mapper类
job.setMapperClass(MyMapper.class);
//指定输出<k2,v2>的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(KpiWritable.class);
//1.3 指定分区类
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//1.4 TODO 排序、分组
//1.5 TODO (可选)归约
//2.2 指定自定义的Reducer类
job.setReducerClass(MyReducer.class);
//指定输入<k3,v3>的值
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(KpiWritable.class);
//2.3 指定输出到哪里
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
//设定输出文件的格式化类
job.setOutputFormatClass(TextOutputFormat.class);
//把任务及交给JobTrack执行
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
throws IOException, InterruptedException {
final String[] splited = value.toString().split("\t");
final String msisdn = splited[1];
final Text k2 = new Text(msisdn);
final KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]);
context.write(k2, v2);
}
}
static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{
/**
* @param k2 表示整个文件中不同的手机号码
* @param v2表示该手机号在不同时段的流量的集合
* @param
*/
@Override
protected void reduce(Text k2, Iterable<KpiWritable> v2s,
Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
throws IOException, InterruptedException {
long upPackNum = 0L;
long downPackNum = 0L;
long upPayLoad = 0L;
long downPayLoad = 0L;
for (KpiWritable kpiWritable : v2s) {
upPackNum += kpiWritable.upPackNum;
downPackNum += kpiWritable.downPackNum;
upPayLoad += kpiWritable.upPayLoad;
downPayLoad += kpiWritable.downPayLoad;
}
final KpiWritable v3 = new KpiWritable(upPackNum+"",downPackNum+"",upPayLoad+"",downPayLoad+"");
context.write(k2, v3);
}
}
}
class KpiWritable implements Writable{
long upPackNum;
long downPackNum;
long upPayLoad;
long downPayLoad;
public KpiWritable(){
}
public KpiWritable(String upPackNum, String downPackNum, String upPayLoad,
String downPayLoad) {
super();
this.upPackNum = Long.parseLong(upPackNum);
this.downPackNum = Long.parseLong(downPackNum);
this.upPayLoad = Long.parseLong(upPayLoad);
this.downPayLoad = Long.parseLong(downPayLoad);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upPackNum);
out.writeLong(downPackNum);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upPackNum = in.readLong();
this.downPackNum = in.readLong();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
}
@Override
public String toString(){
return upPackNum+"\t"+downPackNum+"\t"+upPayLoad+"\t"+downPayLoad;
}
}
九、MapReduce输入输出处理类
1、输入处理类
FileInputFormat:
FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
InputFormat 负责处理MR的输入部分.
有三个作用:
验证作业的输入是否规范.
把输入文件切分成InputSplit.
提供RecordReader 的实现类,把InputSplit读到Mapper中进行处理.
public abstract class InputFormat<K, V> {
/**
* Logically split the set of input files for the job.
*
* <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
* for processing.</p>
*
* <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i><input-file-path, start, offset></i> tuple. The InputFormat
* also creates the {@link RecordReader} to read the {@link InputSplit}.
*
* @param context job configuration.
* @return an array of {@link InputSplit}s for the job.
*/
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
/**
* Create a record reader for a given split. The framework will call
* {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
* @throws IOException
* @throws InterruptedException
*/
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
}
InputSplit
在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。
FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分的结果是这个文件或者是这个文件中的一部分。
如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。
当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率底下。
例如:一个1G的文件,会被划分成16个64MB的split,并分配16个map任务处理,而10000个100kb的文件会被10000个map任务处理。
TextInputFormat
TextInputformat是默认的处理类,处理普通文本文件。
文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value。
默认以\n或回车键作为一行记录。
TextInputFormat继承了FileInputFormat。
其他输入类
◆ CombineFileInputFormat
相对于大量的小文件来说,hadoop更合适处理少量的大文件。
CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。
◆ KeyValueTextInputFormat
当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这种格式的文件非常适合。
◆ NLineInputformat
NLineInputformat可以控制在每个split中数据的行数。
◆ SequenceFileInputformat
当输入文件格式是sequencefile的时候,要使用SequenceFileInputformat作为输入。
InputFormat类的层次结构
2、输出处理类
TextOutputformat
默认的输出格式,key和value中间值用tab隔开的。
SequenceFileOutputformat
将key和value以sequencefile格式输出。
SequenceFileAsBinaryOutputFormat
将key和value以原始二进制的格式输出。
MapFileOutputFormat
将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。
MultipleOutputFormat
默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs可以实现这个功能。
OutputFormat类的层次结构
3、自定义输入类格式
1)继承FileInputFormat基类。
2)重写里面的getSplits(JobContext context)方法。
3)重写createRecordReader(InputSplit split,TaskAttemptContext context)方法。
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/scgaliguodong123_/article/details/44729839