码迷,mamicode.com
首页 > 其他好文 > 详细

hadoop 学习自定义分区

时间:2016-04-06 12:52:31      阅读:277      评论:0      收藏:0      [点我收藏+]

标签:

(网易云课程hadoop大数据实战学习笔记)

如图所示:有三个ReducerTask,因此处理完成之后的数据存储在三个文件中;
技术分享
 
默认情况下,numReduceTasks的数量为1,前面做的实验中,输出数据都是在一个文件中。通过自定义myPatitioner类,可以把ruduce处理后的数据分类汇总,这里MyPartitioner是Partitioner的基类,如果需要定制partitioner也需要继承该类。HashPartitioner是mapreduce的默认partitioner。计算方法是which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。
实验内容,在上一个自定义排序的基础上,把正方形和长方形分别进行排序,即设置两个ReducerTask任务,通过自定义MyPartitioner实现。
package com.nwpulisz;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
public class SelfDefineSort {
	/**
	 * @param args
	 * @author nwpulisz
	 * @date 2016.4.1
	 */
	static final String INPUT_PATH="hdfs://192.168.255.132:9000/input";
	static final String OUTPUT_PATH="hdfs://192.168.255.132:9000/output";
	
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Path outPut_path= new Path(OUTPUT_PATH);
		Job job = new Job(conf, "SelfDefineSort");
		
		//如果输出路径是存在的,则提前删除输出路径
		FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
		if(fileSystem.exists(outPut_path))
		{
			fileSystem.delete(outPut_path,true);
		}
		
		job.setJarByClass(RectangleWritable.class); //注意不能少setJarByClass,要不出现报错,源码中的解释。
													//Set the Jar by finding where a given class came from.
		
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		FileOutputFormat.setOutputPath(job, outPut_path);
		
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		
		job.setMapOutputKeyClass(RectangleWritable.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setPartitionerClass(MyPatitioner.class); //自定义myPatitioner类,把ruduce处理后的数据分类汇总;
		job.setNumReduceTasks(2); //设置ReduceTask数量为2;
		
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		job.waitForCompletion(true);
	}
	
	static class MyMapper extends Mapper<LongWritable, Text, RectangleWritable, NullWritable>{
		protected void map(LongWritable k1, Text v1, 
                Context context) throws IOException, InterruptedException {
			String[] splits = v1.toString().split("\t");
			RectangleWritable k2 = new RectangleWritable(Integer.parseInt(splits[0]),
					Integer.parseInt(splits[1]));
			
			context.write(k2,NullWritable.get());
		}
	}
	
	static class MyReducer extends Reducer<RectangleWritable, NullWritable,
					IntWritable, IntWritable>{
		protected void reduce(RectangleWritable k2,
				Iterable<NullWritable> v2s,
				Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			context.write(new IntWritable(k2.getLength()), new IntWritable(k2.getWidth()));
		}
		
	}
	
}
class MyPatitioner extends Partitioner<RectangleWritable, NullWritable>{
	@Override
	public int getPartition(RectangleWritable k2, NullWritable v2, int numPartitions) {
		// TODO Auto-generated method stub
		if (k2.getLength() == k2.getWidth()) { //根据长方形和正方形进行分类
			return 0;  
		}else {
			return 1;
		}
	}
	 
}

  

其中的RectangleWritable类与上一节中定义的相同。
此处,在eclipse中直接运行该代码,会显示错误,如下图:
技术分享
可能是因为hadoop版本的原因,因此需要将源码文件打成jar包,在hadoop服务器上运行,jar中包括内容为:
技术分享
在hadoop上运行 hadoop jar SelfDefinePartitioner.jar(jar包名,自定义)
运行结果如下图所示:
开始运行:
技术分享
输出结果:
技术分享

 





hadoop 学习自定义分区

标签:

原文地址:http://www.cnblogs.com/nwpulisz/p/5358644.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!