Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。
一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。
框架会对map的输出先进行排序, 然后把结果输入给reduce任务。
通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
通常,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。
这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用
输入与输出流程主要分为 Map阶段,Shuffle阶段,Reduce阶段
Map阶段
分片(Split): map阶段的输入通常是HDFS上文件,在运行Mapper前,FileInputFormat会将输入文件分割成多个split 1个split至少包含1个HDFS的Block(默认为128M)然后每一个分片运行一个map进行处理。 执行(Map): 对输入分片中的每个键值对调用map()函数进行运算,然后输出一个结果键值对。 Partitioner: 对map()的输出进行partition,即根据key或value及reduce的数量来决定当前的这对键值对最终应该交由哪个reduce处理。 默认是对key哈希后再以reduce task数量取模,默认的取模方式只是为了避免数据倾斜。然后该key/value对以及partitionIdx的结果都会被写入环形缓冲区。 溢写(Spill): map输出写在内存中的环形缓冲区,默认当缓冲区满80%,启动溢写线程,将缓冲的数据写出到磁盘。 Sort: 在溢写到磁盘之前,使用快排对缓冲区数据按照partitionIdx, key排序。(每个partitionIdx表示一个分区,一个分区对应一个reduce) Combiner: 如果设置了Combiner,那么在Sort之后,还会对具有相同key的键值对进行合并,减少溢写到磁盘的数据量。 合并(Merge): 溢写可能会生成多个文件,这时需要将多个文件合并成一个文件。合并的过程中会不断地进行 sort & combine 操作,最后合并成了一个已分区且已排序的文件。
Shuffle阶段
Shuffle
广义上Shuffle阶段横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和merge/sort过程。 通常认为Shuffle阶段就是将map的输出作为reduce的输入的过程 Copy过程
Reduce端启动一些copy线程,通过HTTP方式将map端输出文件中属于自己的部分拉取到本地。 Reduce会从多个map端拉取数据,并且每个map的数据都是有序的。 Merge过程: Copy过来的数据会先放入内存缓冲区中,这里的缓冲区比较大;当缓冲区数据量达到一定阈值时, 将数据溢写到磁盘(与map端类似,溢写过程会执行 sort & combine)。如果生成了多个溢写文件,它们会被merge成一个有序的最终文件。 这个过程也会不停地执行 sort & combine 操作。
Reduce阶段
Shuffle阶段最终生成了一个有序的文件作为Reduce的输入,对于该文件中的每一个键值对调用reduce()方法,并将结果写到HDFS。
WordCountMapper
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { /** * 每读取一行 就会调用一次map方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 进来的value是一行文本,我们就将这一行文本切分单词 String[] words = value.toString().split(" "); for (String word : words) { context.write(new Text(word), new LongWritable(1)); } } }
WordCountReducer
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; // 看起来是for循环,其实是调用next这种迭代机制 for (LongWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); } // 整个Reduce执行结束之后 会调用这个方法 protected void cleanup(Context context) throws IOException ,InterruptedException { } }
WordCount
public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //操作HDFS上的文件 默认操作本地文件 //conf.set("fs.defaultFS", "hdfs://hadoop-01:9000"); //创建一个用来描述本次数据处理工作的job对象 Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); //设置这个job所用的业务mapper和reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // job.setCombinerClass(WordCountReducer.class); //设置mapper和reducer的输出数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //设置job要处理的数据在哪里,输出到哪里 FileInputFormat.setInputPaths(job, new Path("./wordcount/words.txt")); FileOutputFormat.setOutputPath(job, new Path("D:\\wordcount\\output")); // 操作HDFS上的文件 // FileInputFormat.setInputPaths(job, new Path("/2016.5.12/words.txt")); // FileOutputFormat.setOutputPath(job, new Path("/2016.5.12/output")); //向yarn集群发出提交job的请求 job.waitForCompletion(true); } }