码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce简单使用

时间:2015-05-15 20:04:42      阅读:235      评论:0      收藏:0      [点我收藏+]

标签:

1、启动hadoop工程

技术分享

2、MapReduce统计文本单词数量

public class WordCount {

	private static class WordMapper extends
			Mapper<LongWritable, Text, Text, IntWritable> {

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {

			String string = value.toString();
			String[] strs = string.split(" ");
			for (String str : strs) {
				context.write(new Text(str), new IntWritable(1));
			}

		}

	}

	private static class WordReduce extends
			Reducer<Text, IntWritable, Text, IntWritable> {
		// key 单词 //value:{1,1}

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {

			int count = 0;
			for (IntWritable value : values) {
				count += value.get();
			}

			context.write(key, new IntWritable(count));

		}

	}

	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration configuration = HadoopConfig.getConfiguration();
		Job job = Job.getInstance(configuration, "统计单词数目");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(WordMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setReducerClass(WordReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path("/data"));
		FileOutputFormat.setOutputPath(job, new Path("/ouput"));
		job.waitForCompletion(true);
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

2、MapReduce排除文本重复数据

public class Dup {
	
	private static class DupMapper extends
			Mapper<LongWritable, Text, Text, NullWritable> {

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {

			context.write(new Text(value), NullWritable.get());

		}

	}

	private static class DupReduce extends 
			Reducer<Text, NullWritable, Text, NullWritable> {

		@Override
		protected void reduce(Text key, Iterable<NullWritable> values,
				Reducer<Text, NullWritable, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {

			context.write(new Text(key), NullWritable.get());

		}

	}

	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration configuration = HadoopConfig.getConfiguration();
		Job job = Job.getInstance(configuration, "去重");

		job.setJarByClass(Dup.class);
		job.setMapperClass(DupMapper.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		job.setReducerClass(DupReduce.class);
		FileInputFormat.addInputPath(job, new Path("/data"));
		FileOutputFormat.setOutputPath(job, new Path("/dup"));
		job.waitForCompletion(true);
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

3、MapReduce实线文本数据的简单排序

public class Sort {

	private static class SortMapper extends
			Mapper<LongWritable, Text, IntWritable, IntWritable> {
           //输出,输入
		@Override
		protected void map(
				LongWritable key,
				Text value_text,
				Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
				throws IOException, InterruptedException {

			int value = Integer.parseInt(value_text.toString());
			context.write(new IntWritable(value), new IntWritable(1));
			
		}

	}
	
	private static class SortReduce extends
			Reducer<IntWritable, IntWritable, IntWritable, NullWritable> {

		@Override
		protected void reduce(
				IntWritable key,
				Iterable<IntWritable> values,
				Reducer<IntWritable, IntWritable, IntWritable, NullWritable>.Context context)
				throws IOException, InterruptedException {

			for (IntWritable value : values) {
				context.write(key, NullWritable.get());
			}

		}

	}

	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration configuration = HadoopConfig.getConfiguration();
		Job job = Job.getInstance(configuration, "排序");

		job.setJarByClass(Sort.class);
		job.setMapperClass(SortMapper.class);
		
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(NullWritable.class);
		
		job.setReducerClass(SortReduce.class);
		FileInputFormat.addInputPath(job, new Path("/data"));
		FileOutputFormat.setOutputPath(job, new Path("/sort"));
		job.waitForCompletion(true);

	}


4、MapReduce实线单表连接

                    文本数据如下:

                    child  parent

                    tom   lucy

                    tom   jack

                    lucy   mary

                    lucy   ben

public class Single {

	private static class SingleMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {

			String string = value.toString();
			if (!string.contains("child")) {

				String[] strings = string.split(" ");
				context.write(new Text(strings[0]), new Text(strings[1] + ":1"));
				context.write(new Text(strings[1]), new Text(strings[0] + ":2"));

			}
		}
	}

	// reduce是执行key的次数
	private static class SingleReduce extends Reducer<Text, Text, Text, Text> {

		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {

			List<String> left = Lists.newArrayList();
			List<String> right = Lists.newArrayList();
	
			for (Text value : values) {  

				String[] strings = value.toString().split(":");
			
				if (strings[1].equals("1")) {
					right.add(strings[0]);
				} else {
					left.add(strings[0]);
				}
			}
			
			for (String lef : left) {
				for (String rig : right) {
					context.write(new Text(lef), new Text(rig));
				}
			}

		}

	}

	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration configuration = HadoopConfig.getConfiguration();
		Job job = Job.getInstance(configuration, "单表连接");

		job.setJarByClass(Sort.class);
		job.setMapperClass(SingleMapper.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setReducerClass(SingleReduce.class);
		FileInputFormat.addInputPath(job, new Path("/data"));
		FileOutputFormat.setOutputPath(job, new Path("/single"));
		job.waitForCompletion(true);

	}

                  输出结果如下:

                    grandchild  grandparent  //额外加入的,表达思路

                    tom   mary

                    tom   ben


MapReduce简单使用

标签:

原文地址:http://my.oschina.net/tdd/blog/415446

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!