标签:
1.MapReduce 编程模型的5个步骤:
1)迭代,将输入数据解析成 key/value 对;
2)将解析的 key/value经过Map处理映射成另一组key/value对;
3)根据key进行分组;
4)以分组为单位进行归约(Reduce 过程);
5)迭代,输出最终结果。
2.MapReduce编程模型模板:
在进行编程过程只需改变Map()和Reduce()方法,如果没有Reduce过程时需要对run()作适当调整。
1 import java.io.IOException; 2 import java.util.jar.JarException; 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.NullWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.Tool; 17 import org.apache.hadoop.util.ToolRunner; 18 import org.apache.jasper.compiler.JavacErrorDetail; 19 20 public class Example extends Configured implements Tool{ 21 22 enum Counter{ 23 LINESKIP; //输出错误行 24 } 25 26 /*MapClass 27 * Mapper< 28 * LongWritable 输入的 key 29 * Text 输入的 value 30 * NullWritable/Text 输出的 key 31 * Text 输出的 value 32 * > 33 * */ 34 //public static class Map extends Mapper<LongWritable,Text,NullWritable,Text> //没有Reduce过程时 35 public static class Map extends Mapper<LongWritable,Text,Text,Text>{ 36 public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException 37 { 38 String mydata=value.toString(); //读取源数据 39 try{ 40 //数据处理 41 String[] mydataSplite=mydata.split(""); //数据切分 42 String aData=mydataSplite[0]; 43 String bData=mydataSplite[1]; 44 45 /* 46 * 没有Reduce过程时 47 * Text outText=new Text(aData+""+bData); 48 * context.write(new Text(aData), new Text(bData)); //输出 key/value ,NullWritable.get()避免输出制表符 49 */ 50 51 context.write(new Text(aData), new Text(bData)); //输出 key/value 52 }catch(java.lang.ArrayIndexOutOfBoundsException e) 53 { 54 context.getCounter(Counter.LINESKIP).increment(1); //出错计数+1 55 return; 56 } 57 } 58 } 59 60 /*Reduce静态类 61 * Reducer< 62 * Text, 输入的 key 63 * Text, 输入的 value 64 * Text, 输出的 key 65 * Text 输出的 value 66 * 67 * Reduce 的输入格式应与 Map的输出格式一致 68 * > 69 * */ 70 public static class Reduce extends Reducer<Text,Text,Text,Text> 71 { 72 public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException 73 { 74 String valuString; 75 String outString=""; 76 for (Text value : values) 77 { 78 valuString=value.toString(); 79 outString+=valuString+","; 80 } 81 context.write(key, new Text(outString)); //输出参数与定义格式一致,如果不是制表符分离的要换成空值 82 } 83 } 84 85 /*run设置运行任务*/ 86 public int run(String[] args) throws Exception { 87 Configuration conf = getConf(); 88 89 Job job = new Job(conf,"Example"); //作务名 90 job.setJarByClass(Example.class); //选择class 91 92 FileInputFormat.setInputPaths(job, new Path(args[0])); //输入路径 93 FileOutputFormat.setOutputPath(job, new Path(args[1])); //输出路径 94 95 job.setMapperClass(Map.class); //调用 Map class启动Map task 96 job.setReducerClass(Reduce.class); //调用 Reduce class 雇用 Reduce task 97 job.setCombinerClass(Reduce.class); 98 job.setInputFormatClass(TextInputFormat.class); //输入格式 99 job.setOutputFormatClass(TextOutputFormat.class); //输出格式 100 //job.setOutputKeyClass(NullWritable.class); //没有Reduce过程时,输出 key 格式,应该与指定的格式一致 101 job.setOutputKeyClass(Text.class); //输出 key 格式,应该与指定的格式一致 102 job.setOutputValueClass(Text.class); //输出 value 格式 103 104 System.exit(job.waitForCompletion(true)?0:1); 105 return 0; 106 } 107 108 /*主函数入口*/ 109 public static void main(String[] args) throws Exception { 110 int res = ToolRunner.run(new Configuration(),new Example(),args); 111 System.exit(res); 112 } 113 114 }
标签:
原文地址:http://www.cnblogs.com/xp12/p/4234686.html