码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop之——MapReduce实战(二)

时间:2015-05-24 23:39:03      阅读:245      评论:0      收藏:0      [点我收藏+]

标签:hadoop   hdfs   mapreduce   压缩   合并   

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45957715

MapReduce的老api写法

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
/**
 *  author liuyazhuang
 */
 public class HelloWorldApp {
	public static void main(String[] args) throws Exception {
		final JobConf job = new JobConf(HelloWorldApp.class);
		FileInputFormat.setInputPaths(job, new Path("/input"));
		FileOutputFormat.setOutputPath(job, new Path("/output"));
		JobClient.runJob(job);
	}
}

MapReduce获取命令行参数

/**
 *  author liuyazhuang
 */
public class CacheFileApp extends Configured implements Tool{
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new CacheFileApp(), args);  
	}
	@Override
	public int run(String[] args) throws Exception {
		//..............................
        	Path in = new Path(args[0]);  
		Path out = new Path(args[1]);  
          
		//..............................
	}
}


计数器

hadoop计数器:可以让开发人员以全局的视角来审查程序的运行情况以及各项指标,及时做出错误诊断并进行相应处理。

内置计数器(MapReduce相关、文件系统相关和作业调度相关)

   ... 

也可以通过http://master:50030/jobdetails.jsp查看

Helloyou, hello me的计数器信息

Counters: 19
   File Output Format Counters 
     Bytes Written=19	//reduce输出到hdfs的字节数
   FileSystemCounters
     FILE_BYTES_READ=481
     HDFS_BYTES_READ=38
     FILE_BYTES_WRITTEN=81316
     HDFS_BYTES_WRITTEN=19
   File Input Format Counters 
     Bytes Read=19	//map从hdfs读取的字节数
   Map-Reduce Framework
     Map output materialized bytes=49
     Map input records=2  	//map读入的记录行数
     Reduce shuffle bytes=0
     Spilled Records=8
     Map output bytes=35
     Total committed heap usage (bytes)=266469376
     SPLIT_RAW_BYTES=105
     Combine input records=0
     Reduce input records=4	//reduce从map端接收的记录行数
     Reduce input groups=3	//reduce函数接收的key数量,即归并后的k2数量
     Combine output records=0
     Reduce output records=3	//reduce输出的记录行数
     Map output records=4	//map输出的记录行数

自定义计数器与实现

Context类调用方法getCounter()

技术分享

计数器声明

1.通过枚举声明

context.getCounter(Enum enum)

2.动态声明

context.getCounter(String groupName,StringcounterName) 

计数器操作

counter.setValue(long value);//设置初始值

counter.increment(longincr);//增加计数

Combiners编程

    每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

   combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

    如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

Partitioner编程

  1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

         2.  HashPartitioner是mapreduce的默认partitioner。计算方法是

which reducer=(key.hashCode() & Integer.MAX_VALUE)% numReduceTasks,得到当前的目的reducer。

         3.  (例子以jar形式运行)

排序和分组

       1在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。

       2分组时也是按照k2进行比较的
/**
	 * 为什么有这个类?因为mapper端比较时只能比较k2,不能比较v2.如果想让v2参与比较,必须参与到k2角色中。自定义该类,包含原来的k2和v2
	 * 在哪里调用本来中的compareTo方法,在1.4中调用
	 */
	static class TwoInt implements WritableComparable<TwoInt>{
		long first;
		long second;
		
		public TwoInt(){}
		
		public TwoInt(long first, long second){
			this.first = first;
			this.second = second;
		}
		
		@Override
		public void readFields(DataInput arg0) throws IOException {
			this.first = arg0.readLong();
			this.second = arg0.readLong();
		}
		@Override
		public void write(DataOutput arg0) throws IOException {
			arg0.writeLong(first);
			arg0.writeLong(second);	
		}
		@Override
		public int hashCode() {
			return (this.first+"").hashCode()+(this.second+"").hashCode();
		}
		
		@Override
		public boolean equals(Object obj) {
			if(obj instanceof TwoInt){
				TwoInt ob = (TwoInt)obj;
				return (this.first==ob.first && this.second==ob.second)?true:false;
			}else{
				return false;
			}
		}
		
		@Override
		public int compareTo(TwoInt o) {
			if(this.first!=o.first){
				return (int)(this.first-o.first);
			}else{
				return (int)(this.second-o.second);
			}
		}
	}
/**
	 * 分组时比较采用的比较器。比较的是原来的k2
	 *
	 */
	static class GroupComparator implements RawComparator<TwoInt>{
		@Override
		public int compare(TwoInt o1, TwoInt o2) {
			return (int)(o1.first-o2.first);
		}
		/**
		 * arg0表示第一个字节数组
		 * arg1表示第一个字节数组的参与比较的开始位置
		 * arg3表示第二个字节数组
		 * arg4表示第二个字节数组的参与比较的开始位置
		 */
		@Override
		public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
				int arg4, int arg5) {
			return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8);
		}
	}

Shuffle

技术分享

  1. 每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
  2. 写磁盘前,要partition,sort。如果有combiner,combine排序后数据。
  3. 等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
  4. Reducer通过Http方式得到输出文件的分区。
  5. TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。排序阶段合并map输出。然后走Reduce阶段

hadoop的压缩codec

Codec为压缩,解压缩的算法实现。在Hadoop中,codec由CompressionCode的实现来表示。下面是一些实现:

技术分享

MapReduce的输出进行压缩

输出的压缩属性

技术分享

技术分享

MapReduce常见算法

  • 单词计数
  • 数据去重
  • 排序
  • Top K
  • 选择
  • 投影
  • 分组
  • 多表连接
  • 单表关联


Hadoop之——MapReduce实战(二)

标签:hadoop   hdfs   mapreduce   压缩   合并   

原文地址:http://blog.csdn.net/l1028386804/article/details/45957715

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!