标签:
最近应项目需要, 对MapReduce进行了一些实验测试, 记录如下.
测试环境
3台VM虚拟机, 都是Ubuntu系统, 1G内存, Hadoop 2.6.0
1台 NameNode (Master)
3台 DataNode (Slave)
其中Master和2台Slave (Slave2, Slave3) 位于一配置较强的物理机中, 另1Slave (Slave1) 位于一配置较差的物理机.
数据准备
共28个文本文件, 每个文件大概12M, 共约330M的数据
其内容大致是
实验1 节点任务分布情况
我们的测试程序就是基本的单词计数程序.
package MyPackage; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class Map extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); job.setJarByClass(WordCount.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setCombinerClass(Reduce.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
实验结果
总共花了约11分钟完成任务
共启动了28个Map Task
28个Map Task在3个Slave上并非平均分布.
Attempt
|
State
|
Status
|
Node
|
Logs
|
Start Time
|
Finish Time
|
Elapsed Time
|
Note
|
---|---|---|---|---|---|---|---|---|
attempt_1430534608975_0001_m_000000_0 | SUCCEEDED | map | /default-rack/Slave3:8042 | logs | Sat, 02 May 2015 02:44:02 GMT | Sat, 02 May 2015 02:47:03 GMT | 3mins, 0sec | |
attempt_1430534608975_0001_m_000001_0 | SUCCEEDED | map | /default-rack/Slave3:8042 | logs | Sat, 02 May 2015 02:44:03 GMT | Sat, 02 May 2015 02:47:08 GMT | 3mins, 5sec | |
attempt_1430534608975_0001_m_000002_0 | SUCCEEDED | map | /default-rack/Slave3:8042 | logs | Sat, 02 May 2015 02:44:03 GMT | Sat, 02 May 2015 02:47:08 GMT | 3mins, 5sec | |
attempt_1430534608975_0001_m_000003_0 | SUCCEEDED | map | /default-rack/Slave3:8042 | logs | Sat, 02 May 2015 02:44:03 GMT | Sat, 02 May 2015 02:47:07 GMT | 3mins, 4sec | |
attempt_1430534608975_0001_m_000004_0 | SUCCEEDED | map | /default-rack/Slave3:8042 | logs | Sat, 02 May 2015 02:44:03 GMT | Sat, 02 May 2015 02:47:08 GMT | 3mins, 5sec | |
attempt_1430534608975_0001_m_000005_0 | SUCCEEDED | map | /default-rack/Slave3:8042 | logs | Sat, 02 May 2015 02:44:03 GMT | Sat, 02 May 2015 02:47:12 GMT | 3mins, 9sec | |
attempt_1430534608975_0001_m_000006_0 | SUCCEEDED | map | /default-rack/Slave3:8042 | logs | Sat, 02 May 2015 02:44:03 GMT | Sat, 02 May 2015 02:47:08 GMT | 3mins, 5sec | |
attempt_1430534608975_0001_m_000007_0 | SUCCEEDED | map | /default-rack/Slave3:8042 | logs | Sat, 02 May 2015 02:44:03 GMT | Sat, 02 May 2015 02:47:00 GMT | 2mins, 57sec | |
attempt_1430534608975_0001_m_000008_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:44:02 GMT | Sat, 02 May 2015 02:45:31 GMT | 1mins, 28sec | |
attempt_1430534608975_0001_m_000009_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:44:02 GMT | Sat, 02 May 2015 02:45:55 GMT | 1mins, 52sec | |
attempt_1430534608975_0001_m_000010_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:44:02 GMT | Sat, 02 May 2015 02:45:59 GMT | 1mins, 57sec | |
attempt_1430534608975_0001_m_000011_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:44:02 GMT | Sat, 02 May 2015 02:45:54 GMT | 1mins, 52sec | |
attempt_1430534608975_0001_m_000012_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:44:02 GMT | Sat, 02 May 2015 02:45:31 GMT | 1mins, 28sec | |
attempt_1430534608975_0001_m_000013_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:44:02 GMT | Sat, 02 May 2015 02:45:55 GMT | 1mins, 52sec | |
attempt_1430534608975_0001_m_000014_1 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:47:18 GMT | Sat, 02 May 2015 02:47:23 GMT | 4sec | |
attempt_1430534608975_0001_m_000015_0 | SUCCEEDED | map | /default-rack/Slave1:8042 | logs | Sat, 02 May 2015 02:45:08 GMT | Sat, 02 May 2015 02:50:42 GMT | 5mins, 33sec | |
attempt_1430534608975_0001_m_000016_0 | SUCCEEDED | map | /default-rack/Slave1:8042 | logs | Sat, 02 May 2015 02:45:08 GMT | Sat, 02 May 2015 02:50:57 GMT | 5mins, 48sec | |
attempt_1430534608975_0001_m_000017_1 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:47:48 GMT | Sat, 02 May 2015 02:47:52 GMT | 4sec | |
attempt_1430534608975_0001_m_000018_1 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:48:18 GMT | Sat, 02 May 2015 02:48:23 GMT | 4sec | |
attempt_1430534608975_0001_m_000019_0 | SUCCEEDED | map | /default-rack/Slave1:8042 | logs | Sat, 02 May 2015 02:45:08 GMT | Sat, 02 May 2015 02:50:39 GMT | 5mins, 30sec | |
attempt_1430534608975_0001_m_000020_1 | SUCCEEDED | map | /default-rack/Slave3:8042 | logs | Sat, 02 May 2015 02:48:48 GMT | Sat, 02 May 2015 02:48:53 GMT | 4sec | |
attempt_1430534608975_0001_m_000021_1 | SUCCEEDED | map | /default-rack/Slave3:8042 | logs | Sat, 02 May 2015 02:48:03 GMT | Sat, 02 May 2015 02:48:09 GMT | 6sec | |
attempt_1430534608975_0001_m_000022_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:45:38 GMT | Sat, 02 May 2015 02:46:53 GMT | 1mins, 14sec | |
attempt_1430534608975_0001_m_000023_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:46:02 GMT | Sat, 02 May 2015 02:47:08 GMT | 1mins, 6sec | |
attempt_1430534608975_0001_m_000024_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:46:01 GMT | Sat, 02 May 2015 02:47:00 GMT | 58sec | |
attempt_1430534608975_0001_m_000025_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:46:01 GMT | Sat, 02 May 2015 02:46:53 GMT | 51sec | |
attempt_1430534608975_0001_m_000026_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:46:01 GMT | Sat, 02 May 2015 02:47:08 GMT | 1mins, 6sec | |
attempt_1430534608975_0001_m_000027_0 | SUCCEEDED | map | /default-rack/Slave2:8042 | logs | Sat, 02 May 2015 02:46:55 GMT | Sat, 02 May 2015 02:47:15 GMT | 19sec |
Hadoop还是相当智能的, 她有Speculative Execution机制, 可以预先主动发现一些执行较慢性能较差的节点, 给他们分配的任务就会少一些, 而给性能较好的节点分配较多的任务.
可以看到, 我们的实验中, 分派情况为: Slave1-3, Slave2-15, Slave3-10. Slave1性能较差, 结果分配到的任务最少.
实验2 Combiner的作用
你可能注意到了, 相对于标准的官方WordCount程序, 我们的代码中加了这么一行
job.setCombinerClass(Reduce.class);
她是做什么用的呢?
这 里要说到MapReduce的Shuffle过程了: 在集群环境中, Map Task与Reduce Task不在同一节点上, Reduce Task 需要跨节点去其他节点拉取Map Task的结果, 我们可以尽可能的减少Map Task的输出数据量, 从而达到提高整体性能的目的.
这里的Combine就是为了合并Map Task的中间结果, 减少Map Task最终的输出数据量, 减少溢写到磁盘的数据量, 从而提高性能.
下面我们去掉代码中这一行,看看任务执行效率如何.
结果跑了20分钟, 停住不跑了,未能完成.... 怀疑是另一物理机抗不住了,导致整个任务卡住了.. 现关掉Slave1, 只启用Slave2和Slave4作DataNode, 继续测试.
开启Combiner, 结果只花了大概5分钟
Map Task分布情况:
Slave2 - 16, Slave3 - 12
关闭Combiner,
结果还是跑卡住了... 说明不是跑的慢的节点造成的而是在这种案例中不启用Combiner甚至是行不通的...后来, 在不启用Combiner仅在Slave2和Slave3作为DataNode的情况下又进行了多次测试, 几乎每次都跑卡住了...
这里至少说明了一点: 在数据量比较大的情况下, 本测试中大概 300M, 不启用combiner, 会给网络造成极大负载, 甚至导致整个任务无法完成.
Combiner到底是什么,推荐看这篇文章非常详细的解释了shuffle过程. http://liouwei20051000285.blog.163.com/blog/static/252367420116208743834/
简单的说,map阶段shuffle过程:
Partition: 决定key/value应该有哪个reducer去处理, 把这种信息放在缓冲区中
Combiner: 对于相同key的项目做reduce,我们这里称之为combiner. 大大减少溢写到磁盘的数据量和网络传输的数据量.
Merge: 将磁盘上的多个溢写文件合并为一个溢写文件. 这个过程中,如果有相同key也会做combine.
reduce阶段shuffle过程:
copy: 拉取数据,把已经完成的map任务复制到reduce任务节点的内存区
merge: 与map阶段类似,在磁盘生成众多的溢写文件
input: 不断的merge,形成最终的reducer输入文件
标签:
原文地址:http://www.cnblogs.com/silva/p/4471653.html