标签:
hadoop版本:1.1.2
一、Mapper类的结构
Mapper类是Job.setInputFormatClass()方法的默认值,Mapper类将输入的键值对原封不动地输出。
org.apache.hadoop.mapreduce.Mapper类的结构如下:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public class Context extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public Context(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN,VALUEIN> reader, RecordWriter<KEYOUT,VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } } /** * Called once at the beginning of the task. * 在task开始之前调用一次 * */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. * 对数据分块中的每个键值对都调用一次 * */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * Called once at the end of the task. * 在task结束后调用一次 * */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Expert users can override this method for more complete control over the * execution of the Mapper. * 默认先调用一次setup方法,然后循环对每个键值对调用map方法,最后调用一次cleanup方法。 * * @param context * @throws IOException */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } }
二、Reducer类的结构
Reducer类是Job.setOutputFormatClass()方法的默认值,Reducer类将输入的键值对原封不动地输出。
org.apache.hadoop.mapreduce.Reduce与Mapper类似。
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public class Context extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public Context(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT,VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass ) throws IOException, InterruptedException { super(conf, taskid, input, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); } } /** * Called once at the start of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * This method is called once for each key. Most applications will define * their reduce class by overriding this method. The default implementation * is an identity function. */ @SuppressWarnings("unchecked") protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } } /** * Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to * control how the reduce task works. */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); } cleanup(context); } }
三、hadoop提供的mapper和reducer实现
我们不一定总是要从头开始自己编写自己的Mapper和Reducer类。Hadoop提供了几种常见的Mapper和Reducer的子类,这些类可以直接用于我们的作业当中。
mapper可以在org.apache.hadoop.mapreduce.lib.map包下面找到如下子类:
reducer可以在org.apache.hadoop.mapreduce.lib.reduce包下面找到如下子类:
四、MapReduce的输入
该类的作用是将输入的数据分割成一个个的split,并将split进一步拆分成键值对作为map函数的输入。
InputFormat
describes the input-specification for a Map-Reduce job.
The Map-Reduce framework relies on the InputFormat
of the job to:
InputSplit
s, each of which is then assigned to an individual Mapper
.RecordReader
implementation to be used to glean input records from the logical InputSplit
for processing by the Mapper
.The default behavior of file-based InputFormat
s, typically sub-classes of FileInputFormat
, is to split the input into logical InputSplit
s based on the total size, in bytes, of the input files. However, the FileSystem
blocksize of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapred.min.split.size.
Clearly, logical splits based on input-size is insufficient for many applications since record boundaries are to respected. In such cases, the application has to also implement a RecordReader
on whom lies the responsibility to respect record-boundaries and present a record-oriented view of the logical InputSplit
to the individual task.
2、RecordReader抽象类
The record reader breaks the data into key/value pairs for input to the Mapper
.
3、hadoop提供的InputFormat
hadoop在org.apache.hadoop.mapreduce.lib.input包下提供了一些InputFormat的实现。hadoop默认使用TextInputFormat类处理输入。
4、hadoop提供的RecordReader
hadoop在org.apache.hadoop.mapreduce.lib.input包下也提供了一些RecordReader的实现。
五、MapReduce的输出
OutputFormat
describes the output-specification for a Map-Reduce job.The Map-Reduce framework relies on the OutputFormat
of the job to:
RecordWriter
implementation to be used to write out the output files of the job. Output files are stored in a FileSystem
.
2、RecordWriter抽象类
RecordWriter
writes the output <key, value> pairs to an output file.
RecordWriter
implementations write the job outputs to the FileSystem
.
3、hadoop提供的OutputFormat
hadoop在org.apache.hadoop.mapreduce.lib.output包下提供了一些OutputFormat的实现。hadoop默认使用TextOutputFormat类处理输出。
4、hadoop提供的RecordWriter
在org.apache.hadoop.mapreduce.lib.input包下的OutputFormat的实现类(子类)将它们所需的RecordWriter定义为内部类,因此不存在单独实现的RecordWriter类。
六、MapReduce各阶段涉及到的类
P70-71
1、InputFormat类
2、Mapper类
3、Combiner类
4、Partitioner类
5、Reducer类
6、OutputFormat类
7、其他
七、详解Shuffle过程:http://langyu.iteye.com/blog/992916
map->shuffle->reduce
P60-64,例子P64-68
附:WEB接口的端口号配置:
mapred-default.xml
<property>
<name>mapred.job.tracker.http.address</name>
<value>0.0.0.0:50030</value>
<description>
The job tracker http server address and port the server will listen on.
If the port is 0 then the server will start on a free port.
</description>
</property>
hdfs-default.xml
<property>
<name>dfs.http.address</name>
<value>0.0.0.0:50070</value>
<description>
The address and the base port where the dfs namenode web ui will listen on.
If the port is 0 then the server will start on a free port.
</description>
</property>
标签:
原文地址:http://www.cnblogs.com/michaellau/p/4203629.html