码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce-nodes

时间:2016-01-01 00:31:07      阅读:240      评论:0      收藏:0      [点我收藏+]

标签:

---恢复内容开始---

什么是MapReduce?

    你想数出一摞牌中有多少张黑桃。直观方式是一张一张检查并且数出有多少张是黑桃。
MapReduce方法则是:
给在座的所有玩家中分配这摞牌
让每个玩家数自己手中的牌有几张是黑桃,然后把这个数目汇报给你
你把所有玩家告诉你的数字加起来,得到最后的结论
 
MapReduce概述
MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.
 
MapReduce合并了两种经典函数:
映射(Mapping)对集合里的每个目标应用同一个操作。即,如果你想把表单里每个单元格乘以二,那么把这个函数单独地应用在每个单元格上的操作就属于mapping。
 
化简(Reducing )遍历集合中的元素来返回一个综合的结果。即,输出表单里一列数字的和这个任务属于reducing。
MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。
这两个函数的形参是key、value对,表示函数的输入信息。 
MapReduce在多于10PB数据时趋向于变慢。
 
Mapreduce原理
技术分享
 
◆执行步骤:
1. map任务处理
1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
1.3 对输出的key、value进行分区。
1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
1.5 (可选)分组后的数据进行归约。
2.reduce任务处理
2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、values处理,转换成新的key、value输出。
2.3 把reduce的输出保存到文件中。
例子:实现WordCountApp
 
  1.  public class WordCountApp {
  2. //自定义的mapper,目的是实现自己的业务逻辑,所以继承org.apache.hadoop.mapreduce.Mapper
  3. publicstaticclassMyMapperextends org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>{
  4. //k1表示
  5. //v1表示一行的文本内容
  6. @Override
  7. protectedvoid map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,LongWritable>.Context context)
  8. throwsIOException,InterruptedException{
  9. //转换成String类型,目的是使用String的方法
  10. String line = value.toString();
  11. String[] splited = line.split("\t");
  12. for(String word : splited){
  13. context.write(newText(word),newLongWritable(1));
  14. }
  15. }
  16. }
  17. //经过mapper操作后,产生4个<k2,v2>,分别是<hello,1>、<you,1>、<hello,1>、<me,1>
  18. //按照k2进行排序
  19. //通过框架分组,产生3个组,分别是<hello,{1,1}><me,{1}><you,{1}>
  20. publicstaticclassMyReducerextends org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>{
  21. //函数被调用3次,传入的形参分别是<hello,{1,1}>、<me,{1}>、<you,{1}>
  22. @Override
  23. protectedvoid reduce(Text k2,Iterable<LongWritable> v2s,
  24. Reducer<Text,LongWritable,Text,LongWritable>.Context context)throwsIOException,InterruptedException{
  25. long count =0L;
  26. for(LongWritable v2 : v2s){
  27. count += v2.get();
  28. }
  29. LongWritable v3 =newLongWritable(count);
  30. context.write(k2, v3);
  31. }
  32. }
  33. //客户端代码,写完交给ResourceManager框架去执行
  34. publicstaticvoid main(String[] args)throwsException{
  35. Configuration conf =newConfiguration();
  36. Job job =Job.getInstance(conf,WordCountApp.class.getSimpleName());
  37. //打成jar执行
  38. job.setJarByClass(WordCountApp.class);
  39. //数据在哪里?
  40. FileInputFormat.setInputPaths(job, args[0]);
  41. //使用哪个mapper处理输入的数据?
  42. job.setMapperClass(MyMapper.class);
  43. //map输出的数据类型是什么?
  44. job.setMapOutputKeyClass(Text.class);
  45. job.setMapOutputValueClass(LongWritable.class);
  46. //使用哪个reducer处理输入的数据?
  47. job.setReducerClass(MyReducer.class);
  48. //reduce输出的数据类型是什么?
  49. job.setOutputKeyClass(Text.class);
  50. job.setOutputValueClass(LongWritable.class);
  51. //数据输出到哪里?
  52. FileOutputFormat.setOutputPath(job,newPath(args[1]));
  53. //交给yarn去执行,直到执行结束才退出本程序
  54. job.waitForCompletion(true);
  55. }
  56. }
*把这个函数打成jar包,即,导出为jar.然后把这个jar文件上传到hadoop集群中,在hadoop中新建一个apps文件夹存放上传上来的jar包.然后,新建一个hello.txt文本,在里面写一些单词,以\t分开.然后把这个hello.txt上传到hdfs文件系统中,然后执行这个命令.bin/hadoop jar apps/wordcount.jar /hello /out     bin/hdfs dfs -text /out/part-r-0000
技术分享技术分享技术分享技术分享
  1. [root@node134 hadoop-2.6.0]# bin/hdfs dfs -put hello.txt /
  2. 15/12/3117:44:05 WARN util.NativeCodeLoader:Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. [root@node134 hadoop-2.6.0]# bin/hdfs dfs -ls /
  4. 15/12/3117:44:08 WARN util.NativeCodeLoader:Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  5. Found4 items
  6. drwxr-xr-x - root supergroup 02015-12-3117:21/dir1
  7. -rw-r--r--1 root supergroup 192015-12-3117:44/hello.txt
  8. drwx------- root supergroup 02015-12-3117:23/tmp
  9. drwx------- root supergroup 02015-12-3117:28/user
  10. [root@node134 hadoop-2.6.0]# bin/hadoop jar APPS/WordCountApp.jar /hello.txt /out
  11.  
  12. [root@node134 hadoop-2.6.0]# bin/hdfs dfs -ls /
    15/12/31 17:45:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Found 5 items
    drwxr-xr-x   - root supergroup          0 2015-12-31 17:21 /dir1
    -rw-r--r--   1 root supergroup         19 2015-12-31 17:44 /hello.txt
    drwxr-xr-x   - root supergroup          0 2015-12-31 17:45 /out
    drwx------   - root supergroup          0 2015-12-31 17:23 /tmp
    drwx------   - root supergroup          0 2015-12-31 17:28 /user
    [root@node134 hadoop-2.6.0]# bin/hdfs dfs -ls /out
    15/12/31 17:45:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Found 2 items
    -rw-r--r--   1 root supergroup          0 2015-12-31 17:45 /out/_SUCCESS
    -rw-r--r--   1 root supergroup         19 2015-12-31 17:45 /out/part-r-00000
    [root@node134 hadoop-2.6.0]# bin/hdfs dfs -text /out/part-r-00000
    15/12/31 17:46:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    hello   2
    me      1
    you     1
 
 
 map、reduce键值对格式
技术分享
 
 
 WordCountApp的驱动代码
  1. publicstaticvoid main(String[] args)throwsException{
  2. Configuration conf =newConfiguration();//加载配置文件
  3. Job job =newJob(conf);//创建一个job,供JobTracker使用
  4. job.setJarByClass(WordCountApp.class);
  5. job.setMapperClass(WordCountMapper.class);
  6. job.setReducerClass(WordCountReducer.class);
  7. FileInputFormat.setInputPaths(job,newPath("hdfs://192.168.1.10:9000/input"));
  8. FileOutputFormat.setOutputPath(job,newPath("hdfs://192.168.1.10:9000/output"));
  9. job.setOutputKeyClass(Text.class);
  10. job.setOutputValueClass(IntWritable.class);
  11. job.waitForCompletion(true);
  12. }
 
序列化概念
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
Java序列化(java.io.Serializable)
 
Hadoop序列化的特点
序列化格式特点:
紧凑:高效使用存储空间。
快速:读写数据的额外开销小
可扩展:可透明地读取老格式的数据
互操作:支持多语言的交互
             Hadoop的序列化格式:Writable
 
 Java序列化的不足:
1.不精简。附加信息多。不大适合随机访问。
2.存储空间大。递归地输出类的超类描述直到不再有超类。序列化图对象,反序列化时为每个对象新建一个实例。相反。Writable对象可以重用。
3.扩展性差。而Writable方便用户自定义
 
Hadoop序列化的作用
序列化在分布式环境的两大作用:进程间通信,永久存储。
Hadoop节点间通信。
技术分享
Writable接口
Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.
MR的任意Key和Value必须实现Writable接口.
技术分享
MR的任意key必须实现WritableComparable接口
技术分享
 
常用的Writable实现类
Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
 
例:
Text test = new Text("test");
IntWritable one = new IntWritable(1);
技术分享
 
技术分享
 
 
自定义Writable类
技术分享
Writable
write 是把每个对象序列化到输出流
readFields是把输入流字节反序列化
 
实现WritableComparable.
Java值对象的比较:一般需要重写toString(),hashCode(),
equals()方法
 
 
 
 
 
 
 





---恢复内容结束---

什么是MapReduce?

    你想数出一摞牌中有多少张黑桃。直观方式是一张一张检查并且数出有多少张是黑桃。
MapReduce方法则是:
给在座的所有玩家中分配这摞牌
让每个玩家数自己手中的牌有几张是黑桃,然后把这个数目汇报给你
你把所有玩家告诉你的数字加起来,得到最后的结论
 
MapReduce概述
MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.
 
MapReduce合并了两种经典函数:
映射(Mapping)对集合里的每个目标应用同一个操作。即,如果你想把表单里每个单元格乘以二,那么把这个函数单独地应用在每个单元格上的操作就属于mapping。
 
化简(Reducing )遍历集合中的元素来返回一个综合的结果。即,输出表单里一列数字的和这个任务属于reducing。
MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。
这两个函数的形参是key、value对,表示函数的输入信息。 
MapReduce在多于10PB数据时趋向于变慢。
 
Mapreduce原理
技术分享
 
◆执行步骤:
1. map任务处理
1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
1.3 对输出的key、value进行分区。
1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
1.5 (可选)分组后的数据进行归约。
2.reduce任务处理
2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、values处理,转换成新的key、value输出。
2.3 把reduce的输出保存到文件中。
例子:实现WordCountApp
 
  1.  public class WordCountApp {
  2. //自定义的mapper,目的是实现自己的业务逻辑,所以继承org.apache.hadoop.mapreduce.Mapper
  3. publicstaticclassMyMapperextends org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>{
  4. //k1表示
  5. //v1表示一行的文本内容
  6. @Override
  7. protectedvoid map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,LongWritable>.Context context)
  8. throwsIOException,InterruptedException{
  9. //转换成String类型,目的是使用String的方法
  10. String line = value.toString();
  11. String[] splited = line.split("\t");
  12. for(String word : splited){
  13. context.write(newText(word),newLongWritable(1));
  14. }
  15. }
  16. }
  17. //经过mapper操作后,产生4个<k2,v2>,分别是<hello,1>、<you,1>、<hello,1>、<me,1>
  18. //按照k2进行排序
  19. //通过框架分组,产生3个组,分别是<hello,{1,1}><me,{1}><you,{1}>
  20. publicstaticclassMyReducerextends org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>{
  21. //函数被调用3次,传入的形参分别是<hello,{1,1}>、<me,{1}>、<you,{1}>
  22. @Override
  23. protectedvoid reduce(Text k2,Iterable<LongWritable> v2s,
  24. Reducer<Text,LongWritable,Text,LongWritable>.Context context)throwsIOException,InterruptedException{
  25. long count =0L;
  26. for(LongWritable v2 : v2s){
  27. count += v2.get();
  28. }
  29. LongWritable v3 =newLongWritable(count);
  30. context.write(k2, v3);
  31. }
  32. }
  33. //客户端代码,写完交给ResourceManager框架去执行
  34. publicstaticvoid main(String[] args)throwsException{
  35. Configuration conf =newConfiguration();
  36. Job job =Job.getInstance(conf,WordCountApp.class.getSimpleName());
  37. //打成jar执行
  38. job.setJarByClass(WordCountApp.class);
  39. //数据在哪里?
  40. FileInputFormat.setInputPaths(job, args[0]);
  41. //使用哪个mapper处理输入的数据?
  42. job.setMapperClass(MyMapper.class);
  43. //map输出的数据类型是什么?
  44. job.setMapOutputKeyClass(Text.class);
  45. job.setMapOutputValueClass(LongWritable.class);
  46. //使用哪个reducer处理输入的数据?
  47. job.setReducerClass(MyReducer.class);
  48. //reduce输出的数据类型是什么?
  49. job.setOutputKeyClass(Text.class);
  50. job.setOutputValueClass(LongWritable.class);
  51. //数据输出到哪里?
  52. FileOutputFormat.setOutputPath(job,newPath(args[1]));
  53. //交给yarn去执行,直到执行结束才退出本程序
  54. job.waitForCompletion(true);
  55. }
  56. }
*把这个函数打成jar包,即,导出为jar.然后把这个jar文件上传到hadoop集群中,在hadoop中新建一个apps文件夹存放上传上来的jar包.然后,新建一个hello.txt文本,在里面写一些单词,以\t分开.然后把这个hello.txt上传到hdfs文件系统中,然后执行这个命令.bin/hadoop jar apps/wordcount.jar /hello /out     bin/hdfs dfs -text /out/part-r-0000
技术分享技术分享技术分享技术分享
  1. [root@node134 hadoop-2.6.0]# bin/hdfs dfs -put hello.txt /
  2. 15/12/3117:44:05 WARN util.NativeCodeLoader:Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. [root@node134 hadoop-2.6.0]# bin/hdfs dfs -ls /
  4. 15/12/3117:44:08 WARN util.NativeCodeLoader:Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  5. Found4 items
  6. drwxr-xr-x - root supergroup 02015-12-3117:21/dir1
  7. -rw-r--r--1 root supergroup 192015-12-3117:44/hello.txt
  8. drwx------- root supergroup 02015-12-3117:23/tmp
  9. drwx------- root supergroup 02015-12-3117:28/user
  10. [root@node134 hadoop-2.6.0]# bin/hadoop jar APPS/WordCountApp.jar /hello.txt /out
  11.  
  12. [root@node134 hadoop-2.6.0]# bin/hdfs dfs -ls /
    15/12/31 17:45:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Found 5 items
    drwxr-xr-x   - root supergroup          0 2015-12-31 17:21 /dir1
    -rw-r--r--   1 root supergroup         19 2015-12-31 17:44 /hello.txt
    drwxr-xr-x   - root supergroup          0 2015-12-31 17:45 /out
    drwx------   - root supergroup          0 2015-12-31 17:23 /tmp
    drwx------   - root supergroup          0 2015-12-31 17:28 /user
    [root@node134 hadoop-2.6.0]# bin/hdfs dfs -ls /out
    15/12/31 17:45:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Found 2 items
    -rw-r--r--   1 root supergroup          0 2015-12-31 17:45 /out/_SUCCESS
    -rw-r--r--   1 root supergroup         19 2015-12-31 17:45 /out/part-r-00000
    [root@node134 hadoop-2.6.0]# bin/hdfs dfs -text /out/part-r-00000
    15/12/31 17:46:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    hello   2
    me      1
    you     1
 
 
 map、reduce键值对格式
技术分享
 
 
 WordCountApp的驱动代码
  1. publicstaticvoid main(String[] args)throwsException{
  2. Configuration conf =newConfiguration();//加载配置文件
  3. Job job =newJob(conf);//创建一个job,供JobTracker使用
  4. job.setJarByClass(WordCountApp.class);
  5. job.setMapperClass(WordCountMapper.class);
  6. job.setReducerClass(WordCountReducer.class);
  7. FileInputFormat.setInputPaths(job,newPath("hdfs://192.168.1.10:9000/input"));
  8. FileOutputFormat.setOutputPath(job,newPath("hdfs://192.168.1.10:9000/output"));
  9. job.setOutputKeyClass(Text.class);
  10. job.setOutputValueClass(IntWritable.class);
  11. job.waitForCompletion(true);
  12. }
 
序列化概念
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
Java序列化(java.io.Serializable)
 
Hadoop序列化的特点
序列化格式特点:
紧凑:高效使用存储空间。
快速:读写数据的额外开销小
可扩展:可透明地读取老格式的数据
互操作:支持多语言的交互
             Hadoop的序列化格式:Writable
 
 Java序列化的不足:
1.不精简。附加信息多。不大适合随机访问。
2.存储空间大。递归地输出类的超类描述直到不再有超类。序列化图对象,反序列化时为每个对象新建一个实例。相反。Writable对象可以重用。
3.扩展性差。而Writable方便用户自定义
 
Hadoop序列化的作用
序列化在分布式环境的两大作用:进程间通信,永久存储。
Hadoop节点间通信。
技术分享
Writable接口
Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.
MR的任意Key和Value必须实现Writable接口.
技术分享
MR的任意key必须实现WritableComparable接口
技术分享
 
常用的Writable实现类
Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
 
例:
Text test = new Text("test");
IntWritable one = new IntWritable(1);
技术分享
 
技术分享
 
 
自定义Writable类
技术分享
Writable
write 是把每个对象序列化到输出流
readFields是把输入流字节反序列化
 
实现WritableComparable.
Java值对象的比较:一般需要重写toString(),hashCode(),
equals()方法
 
 
 
 
 
 
 





MapReduce-nodes

标签:

原文地址:http://www.cnblogs.com/hsw-time/p/5092822.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!