标签:
(网易云课程hadoop大数据实战学习笔记)
package com.nwpulisz; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Reducer; public class SelfDefineSort { /** * @param args * @author nwpulisz * @date 2016.4.1 */ static final String INPUT_PATH="hdfs://192.168.255.132:9000/input"; static final String OUTPUT_PATH="hdfs://192.168.255.132:9000/output"; public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); Path outPut_path= new Path(OUTPUT_PATH); Job job = new Job(conf, "SelfDefineSort"); //如果输出路径是存在的,则提前删除输出路径 FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); if(fileSystem.exists(outPut_path)) { fileSystem.delete(outPut_path,true); } job.setJarByClass(RectangleWritable.class); //注意不能少setJarByClass,要不出现报错,源码中的解释。 //Set the Jar by finding where a given class came from. FileInputFormat.setInputPaths(job, INPUT_PATH); FileOutputFormat.setOutputPath(job, outPut_path); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(RectangleWritable.class); job.setMapOutputValueClass(NullWritable.class); job.setPartitionerClass(MyPatitioner.class); //自定义myPatitioner类,把ruduce处理后的数据分类汇总; job.setNumReduceTasks(2); //设置ReduceTask数量为2; job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); } static class MyMapper extends Mapper<LongWritable, Text, RectangleWritable, NullWritable>{ protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String[] splits = v1.toString().split("\t"); RectangleWritable k2 = new RectangleWritable(Integer.parseInt(splits[0]), Integer.parseInt(splits[1])); context.write(k2,NullWritable.get()); } } static class MyReducer extends Reducer<RectangleWritable, NullWritable, IntWritable, IntWritable>{ protected void reduce(RectangleWritable k2, Iterable<NullWritable> v2s, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub context.write(new IntWritable(k2.getLength()), new IntWritable(k2.getWidth())); } } } class MyPatitioner extends Partitioner<RectangleWritable, NullWritable>{ @Override public int getPartition(RectangleWritable k2, NullWritable v2, int numPartitions) { // TODO Auto-generated method stub if (k2.getLength() == k2.getWidth()) { //根据长方形和正方形进行分类 return 0; }else { return 1; } } }
标签:
原文地址:http://www.cnblogs.com/nwpulisz/p/5358644.html