码迷,mamicode.com
首页 > 其他好文 > 详细

《Hadoop权威指南》笔记 第一章&第二章 MapReduce初探

时间:2015-04-05 10:30:27      阅读:172      评论:0      收藏:0      [点我收藏+]

标签:

技术分享

? ?

技术分享

? ?

技术分享

? ?

技术分享

? ?

技术分享

? ?

技术分享

? ?

技术分享

? ?

使用MapReduce

? ?

技术分享

? ?

技术分享

? ?

技术分享

? ?

import java.io.IOException;

// hadoop针对流处理优化的类型

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

// 会继承这个基类

import org.apache.hadoop.mapred.MapReduceBase;

// 会实现这个接口

import org.apache.hadoop.mapred.Mapper;

// 处理后数据由它来收集

import org.apache.hadoop.mapred.OoutputCollector;

import org.apache.hadoop.mapred.Reporter;

? ?

// 虽然还没有开始系统学习java语法, 我猜测, extends是继承基类,

// implements 是实现接口, java把它们语法上分开了

public class MaxTemperatureMapper extends MapReduceBase

// Mapper是一个泛型接口

implements Mapper<LongWritable, Text, Text, IntWritable> {

? ?

? ?

Mapper是一个泛型接口:

? ?

Mapper<LongWritable, Text, Text, IntWritable>

它有4个形参类型, 分别是map函数的输入键, 输入值, 输出键和输出值的类型.

? ?

就目前来说, 输入键是长整数偏移量, 输入值是一行文本, 输出键是年份, 输出值是气温(整数).

? ?

Hadoop提供了一套可优化网络序列化传输的基本类型, 不直接使用java内嵌的类型. 在这里, LongWritable 相当于 Long, IntWritable 相当于 Int, Text 相当于 String.

? ?

map() 方法的输入是一个键和一个值.

? ?

map() 还提供了 OutputCollector 实例用于输出内容的写入.

? ?

? ?

? ?

reduce函数的输入键值对必须与map函数的输出键值对匹配.

第三部分的代码为负责运行MapReduce的作业.

? ?

import java.io.IOException;

? ?

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

? ?

public class MaxTemperature {

? ?

public static void main(String[] args) throws IOException {

if(args.length !=2 ) {

System.err.println("Usage: MaxTemperature <input path> <output path>");

System.exit(-1);

}

? ?

JobConf conf = new JobConf(MaxTemperatuer.class);

conf.setJobName("Max temperature");

? ?

FileInputFormat.addInputPath(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

conf.setMapperClass(MaxTemperatuerMapper.class);

conf.setReducerClass(MaxTemperatuerReducer.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

? ?

JobClient.runJob(conf);

}

}

? ?

技术分享

? ?

JobConf 对象制定了作业的执行规范. 构造函数的参数为作业所在的类, Hadoop会通过该类来查找包含给类的JAR文件.

? ?

构造 JobConf 对象后, 制定输入和输出数据的路径. 这里是通过 FileInputFormat 的静态方法 addInputPath() 来定义输入数据的路径, 路径可以是单个文件, 也可以是目录(即目录下的所有文件)或符合特定模式的一组文件. 可以多次调用(从名称可以看出, addInputPath() ).

? ?

同理, FileOutputFormat.setOutputPath() 指定输出路径. 即写入目录. 运行作业前, 写入目录不应该存在, Hadoop会拒绝并报错. 这样设计, 主要是防止数据被覆盖, 数据丢失. 毕竟Hadoop运行的时间是很长的, 丢失了非常恼人.

? ?

FileOutputFormat.setOutputPath() conf.setMapperClass() 指定mapreduce类型.

? ?

接着, setOutputKeyClass setOutputValueClass 指定mapreduce函数的 输出 类型, 这两个函数的输出类型往往相同. 如果不同, map的输出函数类型通过 setMapOutputKeyClass setMapOutputValueClass 指定.

? ?

输入的类型用 InputFormat 设置, 本例中没有指定, 使用的是默认的 TextInputFormat (文本输入格式);

? ?

最后, JobClient.runJob() 会提交作业并等待完成, 将结果写到控制台.

? ?

技术分享

? ?

技术分享

? ?

技术分享

? ?

? ?

新增的java MapReduce API与旧API的区别:

? ?

API倾向于使用基类而不是接口, 因为更容易扩展.

? ?

API放在 org.apache.hadoop.mapreduce 包中, 旧的在 org.apache.hadoop.mapred .

? ?

API充分使用context object, 使用户代码能与MapReduce系统进行通信. ex, MapContext基本具备了JobConf, OutputCollectorReporter的功能.

? ?

API支持"推"(push)和"拉"(pull)式的迭代. 这两类API, 均可以K/V pair把记录推给mapper, 亦可以从map()方法中pull.pull的好处是, 可以实现数据的批量处理, 而非逐条记录的处理.

? ?

API实现了配置的统一. 不在通过JobConf对象(Hadoop配置的对象的一个扩展)配置, 而是通过Configuration配置.


API中作业Job类控制, 而非JobClient, 它被删除了.

? ?

输出文件的命名方式稍有不同. mappart-m-nnnnn, reducepart-r-nnnnn(nnnnn为分块序列号, 整数, 0开始).

? ?

技术分享

? ?

技术分享

? ?

一些术语:

? ?

MapReduce作业(job)是一个工作单元:它包括输入数据, MapReduce程序和配置信息. Hadoop作业分为若干个小task来执行, 分为两类, mapreduce任务.

? ?

两类节点控制着作业的执行过程: jobtracker(一个)tasktracker(一系列). 前者调度后者, 后者返回结果给前者.

? ?

技术分享

? ?

输入分片 (input split), 输入数据, 等长的小数据块. 简称分片. 一个分片一个map task.

? ?

技术分享

? ?

负载均衡. 但是, split足够多, 可能会增加管理split的时间和构建map task的时间. 合理的是HDFS的一个块的大小(默认64MB).

? ?

数据本地化优化 (data locality optimization).即在HDFS的节点上运行map task, 性能最优. 节省了网络传输资源.

? ?

map任务写入本地硬盘, 而不是HDFS. 为何? map的输出是中间结果, 完成后可被删除. 如果map失败, 将在另一个节点重起一个map task, 再次构建map中间结果.

? ?

reduce任务不具备数据本地化的优势. 因为它的输入是多个map的输出. 需要网络.

reduce输出放在HDFS, 可靠存储., 第一个副本在本地节点, 其他副本在其他机架节点上. 输出需要占网络带宽.

? ?

reduce的任务数量不是由输入数据的大小决定, 而是指定的.

? ?

如果有多个reduce任务, 每个map会对其输出 分区 (partition), 即为每个reduce任务创建一个分区.

? ?

分区由用户自定义的分区函数控制, 但是一般使用默认的分区器(partitioner)通过哈希函数来分区, 很高效.

? ?

技术分享

? ?

这里map任务和reduce任务之间的数据流称为 混洗 (shuffle). 一般比此图要更复杂, 并且调整混洗参数对作业执行总时间有非常大的影响.

? ?

combiner, 合并函数. 优化方案, map任务的输出进行合并, 以减少mapreduce任务间的数据传输(占用带宽资源).

? ?

并非所有函数都可以合并, 需要具有某个属性(被称为"分布式的"函数). 比如算最大值,求和可以用合并,但算平均数, 需要K/V pair的个数.

? ?

使用combiner, 是需要慎重考虑的.

? ?

技术分享

? ?

指定一个合并函数

? ?

下面的代码是旧的API的实现方法, 本书中都是使用旧的API实现的.

? ?

红色部分为执行的合并函数,它与reducer是一样的

? ?

相当于执行了两次reducer

? ?

public class MaxTemperatureWithCombiner {

public static void main(String[] args) throws IOException {

if(args.length !=2 ) {

System.err.println("Usage: MaxTemperatureWithCombiner <input path> <output path>");

System.exit(-1);

}

? ?

JobConf conf = new JobConf(MaxTemperatuerWithCombiner.class);

conf.setJobName("Max temperature");

? ?

FileInputFormat.addInputPath(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

? ?

conf.setMapperClass(MaxTemperatuerMapper.class);

conf.setCombinerClass(MaxTemperatuerReducer.class);

conf.setReducerClass(MaxTemperatuerReducer.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

? ?

JobClient.runJob(conf);

? ?

}

《Hadoop权威指南》笔记 第一章&第二章 MapReduce初探

标签:

原文地址:http://www.cnblogs.com/keedor/p/4393678.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!