标签:
1 import java.io.IOException; 2 import java.util.regex.PatternSyntaxException; 3 import java.util.Iterator; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.DoubleWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapred.*; 13 import org.apache.hadoop.util.Tool; 14 import org.apache.hadoop.util.ToolRunner; 15 16 17 public class AveragingWithCombiner extends Configured implements Tool { 18 19 public static class MapClass extends MapReduceBase 20 implements Mapper<LongWritable, Text, Text, Text> { 21 22 static enum ClaimsCounters { MISSING, QUOTED }; 23 24 public void map(LongWritable key, Text value, 25 OutputCollector<Text, Text> output, 26 Reporter reporter) throws IOException { 27 28 String fields[] = value.toString().split(",", -20); 29 String country = fields[4]; 30 String numClaims = fields[8]; 31 if (numClaims.length() == 0) { 32 reporter.incrCounter(ClaimsCounters.MISSING, 1); 33 } else if (numClaims.startsWith("\"")) { 34 reporter.incrCounter(ClaimsCounters.QUOTED, 1); 35 } else { 36 output.collect(new Text(country), new Text(numClaims + ",1")); 37 } 38 39 } 40 } 41 42 public static class Combine extends MapReduceBase 43 implements Reducer<Text, Text, Text, Text> { 44 45 public void reduce(Text key, Iterator<Text> values, 46 OutputCollector<Text, Text> output, 47 Reporter reporter) throws IOException { 48 49 double sum = 0; 50 int count = 0; 51 while (values.hasNext()) { 52 String fields[] = values.next().toString().split(","); 53 sum += Double.parseDouble(fields[0]); 54 count += Integer.parseInt(fields[1]); 55 } 56 output.collect(key, new Text(sum + "," + count)); 57 } 58 } 59 60 public static class Reduce extends MapReduceBase 61 implements Reducer<Text, Text, Text, DoubleWritable> { 62 63 public void reduce(Text key, Iterator<Text> values, 64 OutputCollector<Text, DoubleWritable> output, 65 Reporter reporter) throws IOException { 66 67 double sum = 0; 68 int count = 0; 69 while (values.hasNext()) { 70 String fields[] = values.next().toString().split(","); 71 sum += Double.parseDouble(fields[0]); 72 count += Integer.parseInt(fields[1]); 73 } 74 output.collect(key, new DoubleWritable(sum/count)); 75 } 76 } 77 78 public int run(String[] args) throws Exception { 79 // Configuration processed by ToolRunner 80 Configuration conf = getConf(); 81 82 // Create a JobConf using the processed conf 83 JobConf job = new JobConf(conf, AveragingWithCombiner.class); 84 85 // Process custom command-line options 86 Path in = new Path(args[0]); 87 Path out = new Path(args[1]); 88 FileInputFormat.setInputPaths(job, in); 89 FileOutputFormat.setOutputPath(job, out); 90 91 // Specify various job-specific parameters 92 job.setJobName("AveragingWithCombiner"); 93 job.setMapperClass(MapClass.class); 94 job.setCombinerClass(Combine.class); 95 job.setReducerClass(Reduce.class); 96 97 job.setInputFormat(TextInputFormat.class); 98 job.setOutputFormat(TextOutputFormat.class); 99 job.setOutputKeyClass(Text.class); 100 job.setOutputValueClass(Text.class); 101 102 // Submit the job, then poll for progress until the job is complete 103 JobClient.runJob(job); 104 105 return 0; 106 } 107 108 public static void main(String[] args) throws Exception { 109 // Let ToolRunner handle generic command-line options 110 int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args); 111 112 System.exit(res); 113 } 114 }
SkipBadRecords方法
|
JobConf属性
|
setAttemptsToStartSkipping() | mapred.skip.attempts.to.start.skipping |
setMapperMaxSkipRecords() | mapred.skip.map.max.skip.records |
setReducerMaxSkipGroups() | mapred.skip.reduce.max.skip.groups |
setSkipOutputPath() | mapred.skip.out.dir |
setAutoIncrMapperProcCount() | mapred.skip.map.auto.incr.proc.count |
setAutoIncrReducerProcCount() | mapred.skip.reduce.auto.incr.proc.count |
属性
|
描述
|
mapred.compress.map.output | Boolean属性,表示mapper的输出是否被压缩 |
mapred.map.output.compression.codec | Class属性,表示哪种CompressionCodec被用于压缩mapper的输出 |
属性
|
描述
|
mapred.map.tasks.speculative.execution | 布尔属性,表示是否运行map任务猜测执行 |
mapred.reduce.tasks.speculative.execution | 布尔属性,表示是否运行reduce任务猜测执行 |
标签:
原文地址:http://www.cnblogs.com/zhengrunjian/p/4994969.html