码迷,mamicode.com
首页 > 其他好文 > 详细

[hadoop]Hadoop源码分析-Context

时间:2014-09-05 19:42:12      阅读:282      评论:0      收藏:0      [点我收藏+]

标签:des   style   blog   http   color   os   io   使用   java   

  学编程第一个肯定是hello world,Hadoop也不例外,它的hello world就是Wordcount,单词统计例子

bubuko.com,布布扣
 1 package org.apache.hadoop.examples;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.IntWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.util.GenericOptionsParser;
16 
17 public class WordCount {
18 
19   public static class TokenizerMapper 
20        extends Mapper<Object, Text, Text, IntWritable>{
21     
22     private final static IntWritable one = new IntWritable(1);
23     private Text word = new Text();
24       
25     public void map(Object key, Text value, Context context
26                     ) throws IOException, InterruptedException {
27       StringTokenizer itr = new StringTokenizer(value.toString());
28       while (itr.hasMoreTokens()) {
29         word.set(itr.nextToken());
30         context.write(word, one);
31       }
32     }
33   }
34   
35   public static class IntSumReducer 
36        extends Reducer<Text,IntWritable,Text,IntWritable> {
37     private IntWritable result = new IntWritable();
38 
39     public void reduce(Text key, Iterable<IntWritable> values, 
40                        Context context
41                        ) throws IOException, InterruptedException {
42       int sum = 0;
43       for (IntWritable val : values) {
44         sum += val.get();
45       }
46       result.set(sum);
47       context.write(key, result);
48     }
49   }
50 
51   public static void main(String[] args) throws Exception {
52     Configuration conf = new Configuration();
53     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
54     if (otherArgs.length != 2) {
55       System.err.println("Usage: wordcount <in> <out>");
56       System.exit(2);
57     }
58     Job job = new Job(conf, "word count");
59     job.setJarByClass(WordCount.class);
60     job.setMapperClass(TokenizerMapper.class);
61     job.setCombinerClass(IntSumReducer.class);
62     job.setReducerClass(IntSumReducer.class);
63     job.setOutputKeyClass(Text.class);
64     job.setOutputValueClass(IntWritable.class);
65     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
66     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
67     System.exit(job.waitForCompletion(true) ? 0 : 1);
68   }
69 }
WordCount.java

  在Mapper中的map、以及Reducer中的reduce都有一个Context的类型

 1 public void map(Object key, Text value, Context context)
 2                 throws OException,InterruptedException{
 3     StringTokenizer itr = new StringTokenizer(value.toString());
 4     while (itr.hasMoreTokens()) {
 5         word.set(itr.nextToken());
 6         context.write(word, one);
 7     }
 8 }
 9 
10 public void reduce(Text key, Iterable<IntWritable> values,Context context)    
11                    throws IOException, InterruptedException {
12     int sum = 0;
13     for (IntWritable val : values) {
14         sum += val.get();
15     }
16     result.set(sum);
17     context.write(key, result);
18 }

 

  这个Context究竟有何作用呢,按照翻译,它就是一个“上下文”,再由map中的

context.write(word, one);

以及reduce中的

context.write(key, result);

可以了解到,context应该是用来传递数据以及其他运行状态信息,map中的key、value写入context,让它传递给Reducer进行reduce,而reduce进行处理之后数据继续写入context,继续交给Hadoop写入hdfs系统。

  那么Context究竟是怎样的呢。看一下它的继承实现结构。虽然Mapper与Reducer中都有一个Context类,但是它们并不是完全一样的。看一下Mapper与Reducer的源码。

bubuko.com,布布扣
 1 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 2 
 3   public class Context 
 4     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 5     public Context(Configuration conf, TaskAttemptID taskid,
 6                    RecordReader<KEYIN,VALUEIN> reader,
 7                    RecordWriter<KEYOUT,VALUEOUT> writer,
 8                    OutputCommitter committer,
 9                    StatusReporter reporter,
10                    InputSplit split) throws IOException, InterruptedException {
11       super(conf, taskid, reader, writer, committer, reporter, split);
12     }
13   }
14   
15   /**
16    * Called once at the beginning of the task.
17    */
18   protected void setup(Context context
19                        ) throws IOException, InterruptedException {
20     // NOTHING
21   }
22 
23   /**
24    * Called once for each key/value pair in the input split. Most applications
25    * should override this, but the default is the identity function.
26    */
27   @SuppressWarnings("unchecked")
28   protected void map(KEYIN key, VALUEIN value, 
29                      Context context) throws IOException, InterruptedException {
30     context.write((KEYOUT) key, (VALUEOUT) value);
31   }
32 
33   /**
34    * Called once at the end of the task.
35    */
36   protected void cleanup(Context context
37                          ) throws IOException, InterruptedException {
38     // NOTHING
39   }
40   
41   /**
42    * Expert users can override this method for more complete control over the
43    * execution of the Mapper.
44    * @param context
45    * @throws IOException
46    */
47   public void run(Context context) throws IOException, InterruptedException {
48     setup(context);
49     try {
50       while (context.nextKeyValue()) {
51         map(context.getCurrentKey(), context.getCurrentValue(), context);
52       }
53     } finally {
54       cleanup(context);
55     }
56   }
57 }
Mapper
bubuko.com,布布扣
 1 package org.apache.hadoop.mapreduce;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.io.RawComparator;
 7 import org.apache.hadoop.mapred.RawKeyValueIterator;
 8  <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
 9  
10 public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
11 
12   public class Context 
13     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
14     public Context(Configuration conf, TaskAttemptID taskid,
15                    RawKeyValueIterator input, 
16                    Counter inputKeyCounter,
17                    Counter inputValueCounter,
18                    RecordWriter<KEYOUT,VALUEOUT> output,
19                    OutputCommitter committer,
20                    StatusReporter reporter,
21                    RawComparator<KEYIN> comparator,
22                    Class<KEYIN> keyClass,
23                    Class<VALUEIN> valueClass
24                    ) throws IOException, InterruptedException {
25       super(conf, taskid, input, inputKeyCounter, inputValueCounter,
26             output, committer, reporter, 
27             comparator, keyClass, valueClass);
28     }
29   }
30 
31   /**
32    * Called once at the start of the task.
33    */
34   protected void setup(Context context
35                        ) throws IOException, InterruptedException {
36     // NOTHING
37   }
38 
39   @SuppressWarnings("unchecked")
40   protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
41                         ) throws IOException, InterruptedException {
42     for(VALUEIN value: values) {
43       context.write((KEYOUT) key, (VALUEOUT) value);
44     }
45   }
46   protected void cleanup(Context context
47                          ) throws IOException, InterruptedException {
48     // NOTHING
49   }
50   public void run(Context context) throws IOException, InterruptedException {
51     setup(context);
52     try {
53       while (context.nextKey()) {
54         reduce(context.getCurrentKey(), context.getValues(), context);
55       }
56     } finally {
57       cleanup(context);
58     }
59   }
60 }
Reducer

可以看到原来Mapper与Reducer两个Context都是嵌入类的,Mapper的Context是通过继承MapContext,而Reducer的Context则是通过继承ReduceContext。

bubuko.com,布布扣
 1 public class Context 
 2     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 3     public Context(Configuration conf, TaskAttemptID taskid,
 4                    RecordReader<KEYIN,VALUEIN> reader,
 5                    RecordWriter<KEYOUT,VALUEOUT> writer,
 6                    OutputCommitter committer,
 7                    StatusReporter reporter,
 8                    InputSplit split) throws IOException, InterruptedException {
 9       super(conf, taskid, reader, writer, committer, reporter, split);
10     }
11 }
Mapper.Context
bubuko.com,布布扣
 1 public class Context 
 2     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 3     public Context(Configuration conf, TaskAttemptID taskid,
 4                    RawKeyValueIterator input, 
 5                    Counter inputKeyCounter,
 6                    Counter inputValueCounter,
 7                    RecordWriter<KEYOUT,VALUEOUT> output,
 8                    OutputCommitter committer,
 9                    StatusReporter reporter,
10                    RawComparator<KEYIN> comparator,
11                    Class<KEYIN> keyClass,
12                    Class<VALUEIN> valueClass
13                    ) throws IOException, InterruptedException {
14       super(conf, taskid, input, inputKeyCounter, inputValueCounter,
15             output, committer, reporter, 
16             comparator, keyClass, valueClass);
17     }
18 }
Reducer.Context

 在Mapper.Context与Reducer.Context与继承前对比,没有增加成员以及方法,也没有重写方法,单纯把MapContext、ReduceContext重新封装,所以目标就是分析MapContext与ReduceContext

bubuko.com,布布扣
  1 package org.apache.hadoop.mapreduce;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 
  7 
  8 public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
  9   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 10   private RecordReader<KEYIN,VALUEIN> reader;
 11   private InputSplit split;
 12 
 13   public MapContext(Configuration conf, TaskAttemptID taskid,
 14                     RecordReader<KEYIN,VALUEIN> reader,
 15                     RecordWriter<KEYOUT,VALUEOUT> writer,
 16                     OutputCommitter committer,
 17                     StatusReporter reporter,
 18                     InputSplit split) {
 19     super(conf, taskid, writer, committer, reporter);
 20     this.reader = reader;
 21     this.split = split;
 22   }
 23 
 24   
 25   public InputSplit getInputSplit() {
 26     return split;
 27   }
 28 
 29   public KEYIN getCurrentKey() throws IOException, InterruptedException {
 30     return reader.getCurrentKey();
 31   }
 32 
 33   public VALUEIN getCurrentValue() throws IOException, InterruptedException {
 34     return reader.getCurrentValue();
 35   }
 36 
 37   public boolean nextKeyValue() throws IOException, InterruptedException {
 38     return reader.nextKeyValue();
 39   }
 40 
 41 }
 42  
 43  
 44 package org.apache.hadoop.mapreduce;
 45 
 46 import java.io.IOException;
 47 import java.util.Iterator;
 48 import java.util.NoSuchElementException;
 49 
 50 import org.apache.hadoop.conf.Configuration;
 51 import org.apache.hadoop.io.BytesWritable;
 52 import org.apache.hadoop.io.DataInputBuffer;
 53 import org.apache.hadoop.io.RawComparator;
 54 import org.apache.hadoop.io.serializer.Deserializer;
 55 import org.apache.hadoop.io.serializer.SerializationFactory;
 56 import org.apache.hadoop.mapred.RawKeyValueIterator;
 57 import org.apache.hadoop.util.Progressable;
 58 
 59 
 60 public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
 61     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 62   private RawKeyValueIterator input;
 63   private Counter inputKeyCounter;
 64   private Counter inputValueCounter;
 65   private RawComparator<KEYIN> comparator;
 66   private KEYIN key;                                  // current key
 67   private VALUEIN value;                              // current value
 68   private boolean firstValue = false;                 // first value in key
 69   private boolean nextKeyIsSame = false;              // more w/ this key
 70   private boolean hasMore;                            // more in file
 71   protected Progressable reporter;
 72   private Deserializer<KEYIN> keyDeserializer;
 73   private Deserializer<VALUEIN> valueDeserializer;
 74   private DataInputBuffer buffer = new DataInputBuffer();
 75   private BytesWritable currentRawKey = new BytesWritable();
 76   private ValueIterable iterable = new ValueIterable();
 77 
 78   public ReduceContext(Configuration conf, TaskAttemptID taskid,
 79                        RawKeyValueIterator input, 
 80                        Counter inputKeyCounter,
 81                        Counter inputValueCounter,
 82                        RecordWriter<KEYOUT,VALUEOUT> output,
 83                        OutputCommitter committer,
 84                        StatusReporter reporter,
 85                        RawComparator<KEYIN> comparator,
 86                        Class<KEYIN> keyClass,
 87                        Class<VALUEIN> valueClass
 88                        ) throws InterruptedException, IOException{
 89     super(conf, taskid, output, committer, reporter);
 90     this.input = input;
 91     this.inputKeyCounter = inputKeyCounter;
 92     this.inputValueCounter = inputValueCounter;
 93     this.comparator = comparator;
 94     SerializationFactory serializationFactory = new SerializationFactory(conf);
 95     this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
 96     this.keyDeserializer.open(buffer);
 97     this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
 98     this.valueDeserializer.open(buffer);
 99     hasMore = input.next();
100   }
101 
102   /** Start processing next unique key. */
103   public boolean nextKey() throws IOException,InterruptedException {
104     while (hasMore && nextKeyIsSame) {
105       nextKeyValue();
106     }
107     if (hasMore) {
108       if (inputKeyCounter != null) {
109         inputKeyCounter.increment(1);
110       }
111       return nextKeyValue();
112     } else {
113       return false;
114     }
115   }
116 
117   
118   public boolean nextKeyValue() throws IOException, InterruptedException {
119     if (!hasMore) {
120       key = null;
121       value = null;
122       return false;
123     }
124     firstValue = !nextKeyIsSame;
125     DataInputBuffer next = input.getKey();
126     currentRawKey.set(next.getData(), next.getPosition(), 
127                       next.getLength() - next.getPosition());
128     buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
129     key = keyDeserializer.deserialize(key);
130     next = input.getValue();
131     buffer.reset(next.getData(), next.getPosition(),
132         next.getLength() - next.getPosition());
133     value = valueDeserializer.deserialize(value);
134     hasMore = input.next();
135     if (hasMore) {
136       next = input.getKey();
137       nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
138                                          currentRawKey.getLength(),
139                                          next.getData(),
140                                          next.getPosition(),
141                                          next.getLength() - next.getPosition()
142                                          ) == 0;
143     } else {
144       nextKeyIsSame = false;
145     }
146     inputValueCounter.increment(1);
147     return true;
148   }
149 
150   public KEYIN getCurrentKey() {
151     return key;
152   }
153 
154   public VALUEIN getCurrentValue() {
155     return value;
156   }
157 
158   protected class ValueIterator implements Iterator<VALUEIN> {
159 
160     public boolean hasNext() {
161       return firstValue || nextKeyIsSame;
162     }
163 
164     @Override
165     public VALUEIN next() {
166       // if this is the first record, we don‘t need to advance
167       if (firstValue) {
168         firstValue = false;
169         return value;
170       }
171       // if this isn‘t the first record and the next key is different, they
172       // can‘t advance it here.
173       if (!nextKeyIsSame) {
174         throw new NoSuchElementException("iterate past last value");
175       }
176       // otherwise, go to the next key/value pair
177       try {
178         nextKeyValue();
179         return value;
180       } catch (IOException ie) {
181         throw new RuntimeException("next value iterator failed", ie);
182       } catch (InterruptedException ie) {
183         // this is bad, but we can‘t modify the exception list of java.util
184         throw new RuntimeException("next value iterator interrupted", ie);        
185       }
186     }
187 
188     public void remove() {
189       throw new UnsupportedOperationException("remove not implemented");
190     }
191     
192   }
193 
194   protected class ValueIterable implements Iterable<VALUEIN> {
195     private ValueIterator iterator = new ValueIterator();
196     @Override
197     public Iterator<VALUEIN> iterator() {
198       return iterator;
199     } 
200   }
201   
202   public 
203   Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
204     return iterable;
205   }
206 }
MapContext与ReduceContext

 

如下是这两的继承结构

bubuko.com,布布扣

 

bubuko.com,布布扣

MapContext的方法汇总(不包括继承而来的,因为从继承结构可以看出MapContext与ReduceContext均继承TaskInputOutputContext,没有重写继承而来的方法,所以它们继承的都是一致的)

bubuko.com,布布扣

同理ReduceContext的方法汇总

bubuko.com,布布扣

上述的方法的用法都比较明显,不多说。

 

接下来就是看看共同继承的父类TaskInputOutputContext

bubuko.com,布布扣

 

有几个抽象方法:getCurrentKey() 、getCurrentValue() 、nextKeyValue() ,这是MapContext、ReduceContext共同的几个方法,务必需要MapContext与ReduceContext重新实现。write(KEYOUT key, VALUEOUT value) 则是把键值对写入DataOutput数据流中。在MapReduce编程过程中,不需要管理底层的数据流传输,write已经封装好了,直接调用即可写入流中。然后Hadoop会传输到下一步处理的环节。

从前面Mapper.Context、 Reducer.Context、MapContext、ReduceContext、TaskInputOutputContext、TaskAttemptContext均没有添加任何成员变量,都是使用祖先JobContext的成员变量,而JobContext的成员变量汇总如下:

bubuko.com,布布扣

 

bubuko.com,布布扣
  1 package org.apache.hadoop.mapreduce;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.RawComparator;
  8 import org.apache.hadoop.mapreduce.Mapper;
  9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 11 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 12 import org.apache.hadoop.security.Credentials;
 13 import org.apache.hadoop.security.UserGroupInformation;
 14 
 15 /**
 16  * A read-only view of the job that is provided to the tasks while they
 17  * are running.
 18  */
 19 public class JobContext {
 20   // Put all of the attribute names in here so that Job and JobContext are
 21   // consistent.
 22   protected static final String INPUT_FORMAT_CLASS_ATTR = 
 23     "mapreduce.inputformat.class";
 24   protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";
 25   protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
 26   protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
 27   protected static final String OUTPUT_FORMAT_CLASS_ATTR = 
 28     "mapreduce.outputformat.class";
 29   protected static final String PARTITIONER_CLASS_ATTR = 
 30     "mapreduce.partitioner.class";
 31 
 32   protected final org.apache.hadoop.mapred.JobConf conf;
 33   protected final Credentials credentials;
 34   private JobID jobId;
 35 
 36   public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
 37 
 38   public static final String JOB_ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
 39   public static final String JOB_ACL_MODIFY_JOB =
 40     "mapreduce.job.acl-modify-job";
 41 
 42   public static final String CACHE_FILE_VISIBILITIES = 
 43     "mapreduce.job.cache.files.visibilities";
 44   public static final String CACHE_ARCHIVES_VISIBILITIES = 
 45     "mapreduce.job.cache.archives.visibilities";
 46   
 47   public static final String JOB_CANCEL_DELEGATION_TOKEN = 
 48     "mapreduce.job.complete.cancel.delegation.tokens";
 49   public static final String USER_LOG_RETAIN_HOURS = 
 50     "mapred.userlog.retain.hours";
 51   
 52   /**
 53    * The UserGroupInformation object that has a reference to the current user
 54    */
 55   protected UserGroupInformation ugi;
 56   
 57   public JobContext(Configuration conf, JobID jobId) {
 58     this.conf = new org.apache.hadoop.mapred.JobConf(conf);
 59     this.credentials = this.conf.getCredentials();
 60     this.jobId = jobId;
 61     try {
 62       this.ugi = UserGroupInformation.getCurrentUser();
 63     } catch (IOException e) {
 64       throw new RuntimeException(e);
 65     }
 66   }
 67 
 68   void setJobID(JobID jobId) {
 69     this.jobId = jobId;
 70   }
 71 
 72   /**
 73    * Return the configuration for the job.
 74    * @return the shared configuration object
 75    */
 76   public Configuration getConfiguration() {
 77     return conf;
 78   }
 79 
 80   /**
 81    * Get credentials for the job.
 82    * @return credentials for the job
 83    */
 84   public Credentials getCredentials() {
 85     return credentials;
 86   }
 87 
 88   /**
 89    * Get the unique ID for the job.
 90    * @return the object with the job id
 91    */
 92   public JobID getJobID() {
 93     return jobId;
 94   }
 95   
 96   /**
 97    * Get configured the number of reduce tasks for this job. Defaults to 
 98    * <code>1</code>.
 99    * @return the number of reduce tasks for this job.
100    */
101   public int getNumReduceTasks() {
102     return conf.getNumReduceTasks();
103   }
104   
105   /**
106    * Get the current working directory for the default file system.
107    * 
108    * @return the directory name.
109    */
110   public Path getWorkingDirectory() throws IOException {
111     return conf.getWorkingDirectory();
112   }
113 
114   /**
115    * Get the key class for the job output data.
116    * @return the key class for the job output data.
117    */
118   public Class<?> getOutputKeyClass() {
119     return conf.getOutputKeyClass();
120   }
121   
122   /**
123    * Get the value class for job outputs.
124    * @return the value class for job outputs.
125    */
126   public Class<?> getOutputValueClass() {
127     return conf.getOutputValueClass();
128   }
129 
130   /**
131    * Get the key class for the map output data. If it is not set, use the
132    * (final) output key class. This allows the map output key class to be
133    * different than the final output key class.
134    * @return the map output key class.
135    */
136   public Class<?> getMapOutputKeyClass() {
137     return conf.getMapOutputKeyClass();
138   }
139 
140   /**
141    * Get the value class for the map output data. If it is not set, use the
142    * (final) output value class This allows the map output value class to be
143    * different than the final output value class.
144    *  
145    * @return the map output value class.
146    */
147   public Class<?> getMapOutputValueClass() {
148     return conf.getMapOutputValueClass();
149   }
150 
151   /**
152    * Get the user-specified job name. This is only used to identify the 
153    * job to the user.
154    * 
155    * @return the job‘s name, defaulting to "".
156    */
157   public String getJobName() {
158     return conf.getJobName();
159   }
160 
161   /**
162    * Get the {@link InputFormat} class for the job.
163    * 
164    * @return the {@link InputFormat} class for the job.
165    */
166   @SuppressWarnings("unchecked")
167   public Class<? extends InputFormat<?,?>> getInputFormatClass() 
168      throws ClassNotFoundException {
169     return (Class<? extends InputFormat<?,?>>) 
170       conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
171   }
172 
173   /**
174    * Get the {@link Mapper} class for the job.
175    * 
176    * @return the {@link Mapper} class for the job.
177    */
178   @SuppressWarnings("unchecked")
179   public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
180      throws ClassNotFoundException {
181     return (Class<? extends Mapper<?,?,?,?>>) 
182       conf.getClass(MAP_CLASS_ATTR, Mapper.class);
183   }
184 
185   /**
186    * Get the combiner class for the job.
187    * 
188    * @return the combiner class for the job.
189    */
190   @SuppressWarnings("unchecked")
191   public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
192      throws ClassNotFoundException {
193     return (Class<? extends Reducer<?,?,?,?>>) 
194       conf.getClass(COMBINE_CLASS_ATTR, null);
195   }
196 
197   /**
198    * Get the {@link Reducer} class for the job.
199    * 
200    * @return the {@link Reducer} class for the job.
201    */
202   @SuppressWarnings("unchecked")
203   public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
204      throws ClassNotFoundException {
205     return (Class<? extends Reducer<?,?,?,?>>) 
206       conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
207   }
208 
209   /**
210    * Get the {@link OutputFormat} class for the job.
211    * 
212    * @return the {@link OutputFormat} class for the job.
213    */
214   @SuppressWarnings("unchecked")
215   public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
216      throws ClassNotFoundException {
217     return (Class<? extends OutputFormat<?,?>>) 
218       conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
219   }
220 
221   /**
222    * Get the {@link Partitioner} class for the job.
223    * 
224    * @return the {@link Partitioner} class for the job.
225    */
226   @SuppressWarnings("unchecked")
227   public Class<? extends Partitioner<?,?>> getPartitionerClass() 
228      throws ClassNotFoundException {
229     return (Class<? extends Partitioner<?,?>>) 
230       conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
231   }
232 
233   /**
234    * Get the {@link RawComparator} comparator used to compare keys.
235    * 
236    * @return the {@link RawComparator} comparator used to compare keys.
237    */
238   public RawComparator<?> getSortComparator() {
239     return conf.getOutputKeyComparator();
240   }
241 
242   /**
243    * Get the pathname of the job‘s jar.
244    * @return the pathname
245    */
246   public String getJar() {
247     return conf.getJar();
248   }
249 
250   /** 
251    * Get the user defined {@link RawComparator} comparator for 
252    * grouping keys of inputs to the reduce.
253    * 
254    * @return comparator set by the user for grouping values.
255    * @see Job#setGroupingComparatorClass(Class) for details.  
256    */
257   public RawComparator<?> getGroupingComparator() {
258     return conf.getOutputValueGroupingComparator();
259   }
260 }
JobContext

绝大部分的成员变量是static final 变量,有预先设定的值或者直接在构造函数中赋值。基本不需要再改变的,JobContext也提供了返回成员变量的函数,譬如诸多的get**.

至此已将Context的继承与实现讲完,其实也没有讲什么东西,只是把API与源码整理一下呗。

 

bubuko.com,布布扣
本文基于知识共享署名-非商业性使用 3.0 许可协议进行许可。欢迎转载、演绎,但是必须保留本文的署名林羽飞扬,若需咨询,请给我发信

[hadoop]Hadoop源码分析-Context

标签:des   style   blog   http   color   os   io   使用   java   

原文地址:http://www.cnblogs.com/zhengyuhong/p/3958370.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!