码迷,mamicode.com
首页 > 编程语言 > 详细

Hadoop MapReduce(WordCount) Java编程

时间:2016-08-16 22:17:56      阅读:219      评论:0      收藏:0      [点我收藏+]

标签:hadoop mapreduce java编程

编写WordCount程序数据如下:

hello beijing

hello shanghai

hello chongqing

hello tianjin

hello guangzhou

hello shenzhen

...


1、WCMapper:

package com.hadoop.testHadoop;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;


//4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型

//map 和 reduce 的数据输入输出都是以 key-value对的形式封装的

//默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,value为这一行的内容

//LongWritable  Text 是hadoop为了序列化 定义的数据类型

public class WCMapper extends  Mapper<LongWritable,Text,Text,LongWritable>{


   //mapreduce框架每读一行数据就调用一次该方法

@Override

protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

String line=value.toString();

String [] words = line.split(" ");

for(String word:words){

context.write(new Text(word), new LongWritable(1));

}

}

}

2、WCReducer:

package com.hadoop.testHadoop;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

//框架在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法

//<hello,{1,1,1,1,1,1.....}>

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws java.io.IOException ,InterruptedException {

long count=0;

for(LongWritable value:values){

count+=value.get();

}

context.write(key, new LongWritable(count));

}


}


3、WCRunner:

package com.hadoop.testHadoop;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WCRunner {

public static void main(String[] args) throws Exception {

Configuration conf=new Configuration();

Job job = Job.getInstance(conf);

//设置整个job所用的那些类在哪个jar包

   job.setJarByClass(WCRunner.class);

job.setMapperClass(WCMapper.class);

job.setReducerClass(WCReducer.class);

//map输出数据kv类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);

//reduce输出数据kv类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

//执行输入数据的路径

FileInputFormat.setInputPaths(job, new Path("/wordcount/inpput"));

//执行输出数据的路径

FileOutputFormat.setOutputPath(job, new Path("/wordcount/outputmy"));

//将job提交给集群运行 

job.waitForCompletion(true);

}

}


Hadoop MapReduce(WordCount) Java编程

标签:hadoop mapreduce java编程

原文地址:http://8757576.blog.51cto.com/8747576/1839294

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!