码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop之——MapReduce实战(一)

时间:2015-05-24 23:38:32      阅读:396      评论:0      收藏:0      [点我收藏+]

标签:hadoop   mapreduce   合并   搜索   分布式计算   

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45956487

MapReduce概述

     MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.

     MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。

    这两个函数的形参是key、value对,表示函数的输入信息。

MR执行流程

技术分享

MapReduce原理

技术分享

执行步骤

1. map任务处理

1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。

1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

1.3 对输出的key、value进行分区。

1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

1.5 (可选)分组后的数据进行归约。

2.reduce任务处理

2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

2.3 把reduce的输出保存到文件中。

例子:实现WordCountApp

map、reduce键值对格式

技术分享

WordCountApp的驱动代码

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();    //加载配置文件
    Job job = new Job(conf);    //创建一个job,供JobTracker使用
    job.setJarByClass(WordCountApp.class);
		
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.1.10:9000/input"));
    FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.10:9000/output"));
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
		
    job.waitForCompletion(true);
}

JobTracker

         负责接收用户提交的作业,负责启动、跟踪任务执行。

        JobSubmissionProtocol是JobClient与JobTracker通信的接口。

         InterTrackerProtocol是TaskTracker与JobTracker通信的接口。

TaskTracker

      负责执行任务

JobClient

        是用户作业与JobTracker交互的主要接口。

        负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等技术分享

最小的MapReduce驱动

Configuration configuration = new Configuration();
Job job = new Job(configuration, "HelloWorld");
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(IdentityMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
job.waitForCompletion(true);

MapReduce驱动默认的设置

技术分享

序列化概念

序列化(Serialization)是指把结构化对象转化为字节流。

反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。

      Java序列化(java.io.Serializable

Hadoop序列化的特点

序列化格式特点:

  1. 紧凑:高效使用存储空间。
  2. 快速:读写数据的额外开销小
  3. 可扩展:可透明地读取老格式的数据

    互操作:支持多语言的交互

    Hadoop的序列化格式:Writable

注意:Java序列化的不足:

1.不精简。附加信息多。不大适合随机访问。

2.存储空间大。递归地输出类的超类描述直到不再有超类。序列化图对象,反序列化时为每个对象新建一个实例。相反。Writable对象可以重用。

3.扩展性差。而Writable方便用户自定义

Hadoop序列化的作用

序列化在分布式环境的两大作用:进程间通信,永久存储。

Hadoop节点间通信。

技术分享

Writable接口

Writable接口, 是根据 DataInput和 DataOutput 实现的简单、有效的序列化对象.

MR的任意Key和Value必须实现Writable接口.
技术分享

MR的任意key必须实现WritableComparable接口

常用的Writable实现类

  Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列

例:

Text test = new Text("test");

IntWritable one = new IntWritable(1);

技术分享

技术分享

技术分享

Writable

①   write 是把每个对象序列化到输出流

②   readFields是把输入流字节反序列化

③   实现WritableComparable.

④   Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法

基于文件的存储结构

SequenceFile 无序存储

MapFile 会对key建立索引文件,value按key顺序存储

基于MapFile的结构有:

ArrayFile 像我们使用的数组一样,key值为序列化的数字

SetFile 他只有key,value为不可变的数据

BloomMapFile 在 MapFile 的基础上增加了一个 /bloom文件,包含的是二进制的过滤表,在每一次写操作完成时,会更新这个过滤表。

MapReduce的输入处理类

FileInputFormat: 
      
 FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。

InputFormat

技术分享

InputFormat 负责处理MR的输入部分.

有三个作用:

验证作业的输入是否规范.

把输入文件切分成InputSplit.

提供RecordReader 的实现类,把InputSplit读到Mapper中进行处理

InputSplit

   在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。

   FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分的结果是这个文件或者是这个文件中的一部分.               

   如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。

    当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率底下。

   例如:一个1G的文件,会被划分成16个64MB的split,并分配16个map任务处理,而10000个100kb的文件会被10000个map任务处理。  

TextInputFormat

  TextInputformat是默认的处理类,处理普通文本文件。

  文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value。

 默认以\n或回车键作为一行记录。

 TextInputFormat继承了FileInputFormat。

InputFormat类的层次结构

技术分享

其他输入类

    CombineFileInputFormat

      相对于大量的小文件来说,hadoop更合适处理少量的大文件。

       CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。

    KeyValueTextInputFormat

       当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这种格式的文件非常适合。

    NLineInputformat 
        NLineInputformat可以控制在每个split中数据的行数。

    SequenceFileInputformat 

       当输入文件格式是sequencefile的时候,要使用SequenceFileInputformat作为输入

自定义输入格式

   1)继承FileInputFormat基类。

   2)重写里面的getSplits(JobContextcontext)方法。

   3)重写createRecordReader(InputSplitsplit,  TaskAttemptContext context)方法。

Hadoop的输出

    TextOutputformat

      默认的输出格式,key和value中间值用tab隔开的。

    SequenceFileOutputformat

      将key和value以sequencefile格式输出。

    SequenceFileAsOutputFormat

      将key和value以原始二进制的格式输出。

    MapFileOutputFormat

      将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。

    MultipleOutputFormat

        默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs可以实现这个功能。

Hadoop之——MapReduce实战(一)

标签:hadoop   mapreduce   合并   搜索   分布式计算   

原文地址:http://blog.csdn.net/l1028386804/article/details/45956487

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!