标签:
最近在研究Hadoop,发现网上的一些关于Hadoop的资料都是以前的1.X版本的,包括MapReduce的工作原理,都是以前的一些过时了的东西,所以自己重新整理了一些新2.X版本的MapReduce的工作原理1. Container是YARN中资源的抽象,它封装了某个节点上一定量的资源(CPU和内存两类资源)。
2. Container由ApplicationMaster向ResourceManager申请的,由ResouceManager中的资源调度器异步分配给ApplicationMaster;/** * * @author 汤高 * Mapper<LongWritable, Text, Text, IntWritable>中 LongWritable,IntWritable是Hadoop数据类型表示长整型和整形 * * LongWritable, Text表示输入类型 (比如本应用单词计数输入是 偏移量(字符串中的第一个单词的其实位置),对应的单词(值)) * Text, IntWritable表示输出类型 输出是单词 和他的个数 * 注意:map函数中前两个参数LongWritable key, Text value和输出类型不一致 * 所以后面要设置输出类型 要使他们一致 */ //Map过程 public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /*** * */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //默认的map的value是每一行,我这里自定义的是以空格分割 String[] vs = value.toString().split("\\s"); for (String v : vs) { //写出去 context.write(new Text(v), ONE); } } }
//Reduce过程 /*** * @author 汤高 * Text, IntWritable输入类型,从map过程获得 既map的输出作为Reduce的输入 * Text, IntWritable输出类型 */ public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count=0; for(IntWritable v:values){ count+=v.get();//单词个数加一 } context.write(key, new IntWritable(count)); } }
public static void main(String[] args) { Configuration conf=new Configuration(); try { //args从控制台获取路径 解析得到域名 String[] paths=new GenericOptionsParser(conf,args).getRemainingArgs(); if(paths.length<2){ throw new RuntimeException("必須輸出 輸入 和输出路径"); } //得到一个Job 并设置名字 Job job=Job.getInstance(conf,"wordcount"); //设置Jar 使本程序在Hadoop中运行 job.setJarByClass(WordCount.class); //设置Map处理类 job.setMapperClass(WordCountMapper.class); //设置map的输出类型,因为不一致,所以要设置 job.setMapOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置Reduce处理类 job.setReducerClass(WordCountReducer.class); //设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(paths[0])); FileOutputFormat.setOutputPath(job, new Path(paths[1])); //启动运行 System.exit(job.waitForCompletion(true) ? 0:1); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }
package hadoopday02; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { //计数变量 private static final IntWritable ONE = new IntWritable(1); /** * * @author 汤高 * Mapper<LongWritable, Text, Text, IntWritable>中 LongWritable,IntWritable是Hadoop数据类型表示长整型和整形 * * LongWritable, Text表示输入类型 (比如本应用单词计数输入是 偏移量(字符串中的第一个单词的其实位置),对应的单词(值)) * Text, IntWritable表示输出类型 输出是单词 和他的个数 * 注意:map函数中前两个参数LongWritable key, Text value和输出类型不一致 * 所以后面要设置输出类型 要使他们一致 */ //Map过程 public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /*** * */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //默认的map的value是每一行,我这里自定义的是以空格分割 String[] vs = value.toString().split("\\s"); for (String v : vs) { //写出去 context.write(new Text(v), ONE); } } } //Reduce过程 /*** * @author 汤高 * Text, IntWritable输入类型,从map过程获得 既map的输出作为Reduce的输入 * Text, IntWritable输出类型 */ public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count=0; for(IntWritable v:values){ count+=v.get();//单词个数加一 } context.write(key, new IntWritable(count)); } } public static void main(String[] args) { Configuration conf=new Configuration(); try { //args从控制台获取路径 解析得到域名 String[] paths=new GenericOptionsParser(conf,args).getRemainingArgs(); if(paths.length<2){ throw new RuntimeException("必須輸出 輸入 和输出路径"); } //得到一个Job 并设置名字 Job job=Job.getInstance(conf,"wordcount"); //设置Jar 使本程序在Hadoop中运行 job.setJarByClass(WordCount.class); //设置Map处理类 job.setMapperClass(WordCountMapper.class); //设置map的输出类型,因为不一致,所以要设置 job.setMapOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置Reduce处理类 job.setReducerClass(WordCountReducer.class); //设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(paths[0])); FileOutputFormat.setOutputPath(job, new Path(paths[1])); //启动运行 System.exit(job.waitForCompletion(true) ? 0:1); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Hadoop2.6(新版本)----MapReduce工作原理
标签:
原文地址:http://blog.csdn.net/tanggao1314/article/details/51275812