标签:
---恢复内容开始---
什么是MapReduce?
public class WordCountApp {
//自定义的mapper,目的是实现自己的业务逻辑,所以继承org.apache.hadoop.mapreduce.Mapper
publicstaticclassMyMapperextends org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>{
//k1表示
//v1表示一行的文本内容
@Override
protectedvoid map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,LongWritable>.Context context)
throwsIOException,InterruptedException{
//转换成String类型,目的是使用String的方法
String line = value.toString();
String[] splited = line.split("\t");
for(String word : splited){
context.write(newText(word),newLongWritable(1));
}
}
}
//经过mapper操作后,产生4个<k2,v2>,分别是<hello,1>、<you,1>、<hello,1>、<me,1>
//按照k2进行排序
//通过框架分组,产生3个组,分别是<hello,{1,1}><me,{1}><you,{1}>
publicstaticclassMyReducerextends org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>{
//函数被调用3次,传入的形参分别是<hello,{1,1}>、<me,{1}>、<you,{1}>
@Override
protectedvoid reduce(Text k2,Iterable<LongWritable> v2s,
Reducer<Text,LongWritable,Text,LongWritable>.Context context)throwsIOException,InterruptedException{
long count =0L;
for(LongWritable v2 : v2s){
count += v2.get();
}
LongWritable v3 =newLongWritable(count);
context.write(k2, v3);
}
}
//客户端代码,写完交给ResourceManager框架去执行
publicstaticvoid main(String[] args)throwsException{
Configuration conf =newConfiguration();
Job job =Job.getInstance(conf,WordCountApp.class.getSimpleName());
//打成jar执行
job.setJarByClass(WordCountApp.class);
//数据在哪里?
FileInputFormat.setInputPaths(job, args[0]);
//使用哪个mapper处理输入的数据?
job.setMapperClass(MyMapper.class);
//map输出的数据类型是什么?
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//使用哪个reducer处理输入的数据?
job.setReducerClass(MyReducer.class);
//reduce输出的数据类型是什么?
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//数据输出到哪里?
FileOutputFormat.setOutputPath(job,newPath(args[1]));
//交给yarn去执行,直到执行结束才退出本程序
job.waitForCompletion(true);
}
}
[root@node134 hadoop-2.6.0]# bin/hdfs dfs -put hello.txt /
15/12/3117:44:05 WARN util.NativeCodeLoader:Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[root@node134 hadoop-2.6.0]# bin/hdfs dfs -ls /
15/12/3117:44:08 WARN util.NativeCodeLoader:Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found4 items
drwxr-xr-x - root supergroup 02015-12-3117:21/dir1
-rw-r--r--1 root supergroup 192015-12-3117:44/hello.txt
drwx------- root supergroup 02015-12-3117:23/tmp
drwx------- root supergroup 02015-12-3117:28/user
[root@node134 hadoop-2.6.0]# bin/hadoop jar APPS/WordCountApp.jar /hello.txt /out
publicstaticvoid main(String[] args)throwsException{
Configuration conf =newConfiguration();//加载配置文件
Job job =newJob(conf);//创建一个job,供JobTracker使用
job.setJarByClass(WordCountApp.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
FileInputFormat.setInputPaths(job,newPath("hdfs://192.168.1.10:9000/input"));
FileOutputFormat.setOutputPath(job,newPath("hdfs://192.168.1.10:9000/output"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
---恢复内容结束---
什么是MapReduce?
public class WordCountApp {
//自定义的mapper,目的是实现自己的业务逻辑,所以继承org.apache.hadoop.mapreduce.Mapper
publicstaticclassMyMapperextends org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>{
//k1表示
//v1表示一行的文本内容
@Override
protectedvoid map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,LongWritable>.Context context)
throwsIOException,InterruptedException{
//转换成String类型,目的是使用String的方法
String line = value.toString();
String[] splited = line.split("\t");
for(String word : splited){
context.write(newText(word),newLongWritable(1));
}
}
}
//经过mapper操作后,产生4个<k2,v2>,分别是<hello,1>、<you,1>、<hello,1>、<me,1>
//按照k2进行排序
//通过框架分组,产生3个组,分别是<hello,{1,1}><me,{1}><you,{1}>
publicstaticclassMyReducerextends org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>{
//函数被调用3次,传入的形参分别是<hello,{1,1}>、<me,{1}>、<you,{1}>
@Override
protectedvoid reduce(Text k2,Iterable<LongWritable> v2s,
Reducer<Text,LongWritable,Text,LongWritable>.Context context)throwsIOException,InterruptedException{
long count =0L;
for(LongWritable v2 : v2s){
count += v2.get();
}
LongWritable v3 =newLongWritable(count);
context.write(k2, v3);
}
}
//客户端代码,写完交给ResourceManager框架去执行
publicstaticvoid main(String[] args)throwsException{
Configuration conf =newConfiguration();
Job job =Job.getInstance(conf,WordCountApp.class.getSimpleName());
//打成jar执行
job.setJarByClass(WordCountApp.class);
//数据在哪里?
FileInputFormat.setInputPaths(job, args[0]);
//使用哪个mapper处理输入的数据?
job.setMapperClass(MyMapper.class);
//map输出的数据类型是什么?
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//使用哪个reducer处理输入的数据?
job.setReducerClass(MyReducer.class);
//reduce输出的数据类型是什么?
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//数据输出到哪里?
FileOutputFormat.setOutputPath(job,newPath(args[1]));
//交给yarn去执行,直到执行结束才退出本程序
job.waitForCompletion(true);
}
}
[root@node134 hadoop-2.6.0]# bin/hdfs dfs -put hello.txt /
15/12/3117:44:05 WARN util.NativeCodeLoader:Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[root@node134 hadoop-2.6.0]# bin/hdfs dfs -ls /
15/12/3117:44:08 WARN util.NativeCodeLoader:Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found4 items
drwxr-xr-x - root supergroup 02015-12-3117:21/dir1
-rw-r--r--1 root supergroup 192015-12-3117:44/hello.txt
drwx------- root supergroup 02015-12-3117:23/tmp
drwx------- root supergroup 02015-12-3117:28/user
[root@node134 hadoop-2.6.0]# bin/hadoop jar APPS/WordCountApp.jar /hello.txt /out
publicstaticvoid main(String[] args)throwsException{
Configuration conf =newConfiguration();//加载配置文件
Job job =newJob(conf);//创建一个job,供JobTracker使用
job.setJarByClass(WordCountApp.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
FileInputFormat.setInputPaths(job,newPath("hdfs://192.168.1.10:9000/input"));
FileOutputFormat.setOutputPath(job,newPath("hdfs://192.168.1.10:9000/output"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
标签:
原文地址:http://www.cnblogs.com/hsw-time/p/5092822.html