码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce学习笔记

时间:2015-03-30 09:33:45      阅读:175      评论:0      收藏:0      [点我收藏+]

标签:

一、MapReduce概述
MapReduce 是 Hadoop 的核心组成, 是专用于进行数据计算的,是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.

MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入输出信息。
map、reduce键值对格式
技术分享

二、MapReduce体系结构及工作流程
1、JobTracker
负责接收用户提交的作业,负责启动、跟踪任务执行。
JobSubmissionProtocol是JobClient与JobTracker通信的接口。
InterTrackerProtocol是TaskTracker与JobTracker通信的接口。

2、TaskTracker
负责执行任务。

3、JobClient
是用户作业与JobTracker交互的主要接口。
负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。

4、工作流程图
技术分享

执行步骤:

1.map任务处理
1.1读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。

1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

1.3 对输出的key、value进行分区。

1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

1.5 (可选)分组后的数据进行归约。

2.reduce任务处理

2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

2.3 把reduce的输出保存到文件中。

三、统计单词源代码及注释

package mapreduce;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountApp {

    static final String INPUT_PATH = "hdfs://liguodong:9000/hello";

    static final String OUT_PATH = "hdfs://liguodong:9000/out";

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUT_PATH)))
        {
            fileSystem.delete(new Path(OUT_PATH),true);
        }


        final Job job = new Job(conf, WordCountApp.class.getSimpleName());
        //1.1 输入目录在哪里
        FileInputFormat.setInputPaths(job, INPUT_PATH);

        //指定对输入数据进行格式化处理的类
        //job.setInputFormatClass(TextInputFormat.class);

        //1.2 指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定map的输出的<k,v>类型,如果<k3,v3>的类型与<k2,v2>的类型一致,那么可以省略。
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(LongWritable.class);

        //1.3分区
        //job.setPartitionerClass(HashPartitioner.class);
        //job.setNumReduceTasks(1);


        //1.4 TODO  排序、分组
        //1.5 TODO  (可选)归约

        //2.2 指定自定义的Reducer类
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //2.3 指定输出的路径
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //指定输出的格式化类
        //job.setOutputFormatClass(TextOutputFormat.class);

        //把作业交给JobTracker运行
        job.waitForCompletion(true);
    }
    /**
     * KEYIN    即K1     表示每一行的起始位置(偏移量offset)
     * VALUEIN  即v1     表示每一行的文本内容
     * KEYOUT   即k2     表示每一行中的每个单词
     * VALUEOUT 即v2     表示每一行中的每个单词的出现次数,固定值1
     * @author liguodong
     *  Java        Hadoop
     *  Long        LongWritable
     *  String      Text
     */
    static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            final String[] splited = value.toString().split(" ");
            for (String word : splited) {
                context.write(new Text(word), new LongWritable(1));
            }
        }
    }
    /**
     * KEYIN    即k2     表示每一行中的每个单词
     * VALUEIN  即v2     表示每一行中每个单词出现次数,固定值1
     * KEYOUT   即k3     表示整个文件中的不同单词
     * VALUEOUT 即v3     表示整个文件中的不同单词的出现总次数
     * @author liguodong
     */

    static class MyReducer extends Reducer<Text, LongWritable,Text, LongWritable>{
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            long sum = 0L;
            for (LongWritable v2 : v2s) {
                sum += v2.get();
            }
            context.write(k2, new LongWritable(sum));
        }
    }   
}

四、最小的MapReduce驱动

Configuration configuration = new Configuration();
Job job = new Job(configuration, "HelloWorld");
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(IdentityMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
job.waitForCompletion(true);

五、MapReduce驱动默认的设置

技术分享

六、Hadoop序列化与基本类型

1、序列化概念
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
Java序列化(java.io.Serializable)

2、Hadoop序列化格式特点
紧凑:高效使用存储空间。
快速:读写数据的额外开销小
可扩展:可透明地读取老格式的数据
互操作:支持多语言的交互

Hadoop的序列化格式:Writable

3、Hadoop序列化的作用

序列化在分布式环境的两大作用:进程间通信,永久存储。
Hadoop节点间通信。
技术分享

4、基本数据类型
Hadoop的数据类型要求必须实现Writable接口。

Writable接口是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.

MR的任意Key和Value必须实现Writable接口。

public interface Writable {
  /** 
   * Serialize the fields of this object to <code>out</code>.
   * 
   * @param out <code>DataOuput</code> to serialize this object into.
   * @throws IOException
   */
  void write(DataOutput out) throws IOException;

  /** 
   * Deserialize the fields of this object from <code>in</code>.  
   * 
   * <p>For efficiency, implementations should attempt to re-use storage in the 
   * existing object where possible.</p>
   * 
   * @param in <code>DataInput</code> to deseriablize this object from.
   * @throws IOException
   */
  void readFields(DataInput in) throws IOException;
}

MR的任意key必须实现WritableComparable接口。

/**
 * A {@link Writable} which is also {@link Comparable}. 
 *
 * <p><code>WritableComparable</code>s can be compared to each other, typically 
 * via <code>Comparator</code>s. Any type which is to be used as a 
 * <code>key</code> in the Hadoop Map-Reduce framework should implement this
 * interface.</p>
 **/
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

技术分享

实现Writable接口?
write 是把每个对象序列化到输出流
readFields 是把输入流字节反序列化

实现WritableComparable?
Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法。


Hadoop与Java常见基本类型的对照
Long—LongWritable
Integer—IntWritable
Boolean—BooleanWritable
String—Text
技术分享


Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
例:
Text test = new Text(“test”);
IntWritable one = new IntWritable(1);

java类型如何转化为hadoop基本类型?
调用hadoop类型的构造方法,或者调用set()方法。
new LongWritable(123L);

hadoop基本类型如何转化为java类型?
对于Text,需要调用toString()方法,其他类型调用get()方法。

七、基于文件存储的数据结构
SequenceFile—无序存储
MapFile—会对key建立索引文件,value按key顺序存储
基于MapFile的结构有:
ArrayFile—与我们使用的数组一样,key值为序列化的数字
SetFile—只有key,value为不可变的数据
BloomMapFile—在 MapFile 的基础上增加了一个/bloom 文件,包含的是二进制的过滤表,在每一次写操作完成时,会更新这个过滤表。

八、自定义数据类型源码

package mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;


public class KpiApp {
    static final  String INPUT_PATH = "hdfs://liguodong:9000/wlan";
    static final  String OUT_PATH = "hdfs://liguodong:9000/out";

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        final Job job = new Job(new Configuration(),KpiApp.class.getSimpleName());

        //1.1 指定输入文件路径
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        //指定那个类用来格式化输入文件
        job.setInputFormatClass(TextInputFormat.class);

        //1.2 指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定输出<k2,v2>的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(KpiWritable.class);


        //1.3 指定分区类
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);

        //1.4 TODO  排序、分组
        //1.5 TODO  (可选)归约

        //2.2 指定自定义的Reducer类
        job.setReducerClass(MyReducer.class);
        //指定输入<k3,v3>的值
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(KpiWritable.class);
        //2.3 指定输出到哪里
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //设定输出文件的格式化类
        job.setOutputFormatClass(TextOutputFormat.class);

        //把任务及交给JobTrack执行
        job.waitForCompletion(true);

    }


    static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{

        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
                throws IOException, InterruptedException {
            final String[] splited = value.toString().split("\t");
            final String msisdn = splited[1];
            final Text k2 = new Text(msisdn);
            final KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]);
            context.write(k2, v2);

        }       
    }

    static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{

        /**
         *  @param k2 表示整个文件中不同的手机号码
         *  @param v2表示该手机号在不同时段的流量的集合
         *  @param 
         */
        @Override
        protected void reduce(Text k2, Iterable<KpiWritable> v2s,
                Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
                throws IOException, InterruptedException {
            long upPackNum = 0L;
            long downPackNum = 0L;
            long upPayLoad = 0L;
            long downPayLoad = 0L;
            for (KpiWritable kpiWritable : v2s) {
                upPackNum += kpiWritable.upPackNum;
                downPackNum += kpiWritable.downPackNum;
                upPayLoad += kpiWritable.upPayLoad;
                downPayLoad += kpiWritable.downPayLoad;
            }
            final KpiWritable v3 = new KpiWritable(upPackNum+"",downPackNum+"",upPayLoad+"",downPayLoad+"");
            context.write(k2, v3);
        }

    }

}

class KpiWritable implements Writable{

    long upPackNum;
    long downPackNum;
    long upPayLoad;
    long downPayLoad;

    public KpiWritable(){

    }
    public KpiWritable(String upPackNum, String downPackNum, String upPayLoad,
            String  downPayLoad) {
        super();
        this.upPackNum = Long.parseLong(upPackNum);
        this.downPackNum = Long.parseLong(downPackNum); 
        this.upPayLoad = Long.parseLong(upPayLoad); 
        this.downPayLoad = Long.parseLong(downPayLoad);
    }




    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upPackNum);
        out.writeLong(downPackNum);
        out.writeLong(upPayLoad);
        out.writeLong(downPayLoad);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upPackNum = in.readLong();
        this.downPackNum = in.readLong();
        this.upPayLoad = in.readLong();
        this.downPayLoad = in.readLong();
    }
    @Override
    public String toString(){
        return upPackNum+"\t"+downPackNum+"\t"+upPayLoad+"\t"+downPayLoad;
    }
}

九、MapReduce输入处理类

FileInputFormat:
FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。

InputFormat 负责处理MR的输入部分.
有三个作用:
验证作业的输入是否规范.
把输入文件切分成InputSplit.
提供RecordReader 的实现类,把InputSplit读到Mapper中进行处理.

public abstract class InputFormat<K, V> {
  /** 
   * Logically split the set of input files for the job.  
   * 
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   *
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
   * also creates the {@link RecordReader} to read the {@link InputSplit}.
   * 
   * @param context job configuration.
   * @return an array of {@link InputSplit}s for the job.
   */
  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;

  /**
   * Create a record reader for a given split. The framework will call
   * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
   * the split is used.
   * @param split the split to be read
   * @param context the information about the task
   * @return a new record reader
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;
}

InputSplit
在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。

FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分的结果是这个文件或者是这个文件中的一部分。

如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。

当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率底下。

例如:一个1G的文件,会被划分成16个64MB的split,并分配16个map任务处理,而10000个100kb的文件会被10000个map任务处理。

TextInputFormat
TextInputformat是默认的处理类,处理普通文本文件。
文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value。
默认以\n或回车键作为一行记录。
TextInputFormat继承了FileInputFormat。

其他输入类
◆ CombineFileInputFormat
相对于大量的小文件来说,hadoop更合适处理少量的大文件。
CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。

◆ KeyValueTextInputFormat
当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这种格式的文件非常适合。

◆ NLineInputformat
NLineInputformat可以控制在每个split中数据的行数。

◆ SequenceFileInputformat
当输入文件格式是sequencefile的时候,要使用SequenceFileInputformat作为输入。

十、MapReduce输出处理类
TextOutputformat
默认的输出格式,key和value中间值用tab隔开的。

SequenceFileOutputformat
将key和value以sequencefile格式输出。

SequenceFileAsOutputFormat
将key和value以原始二进制的格式输出。

MapFileOutputFormat
将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。

MultipleOutputFormat
默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs可以实现这个功能。

MapReduce学习笔记

标签:

原文地址:http://blog.csdn.net/scgaliguodong123_/article/details/44729839

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!