标签:缓冲区 iterable targe split 单词 pac main nap inpu
package hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 单词统计
*/
public class WordCount {
//本地运行和远程调用方式需要
static {
System.setProperty("hadoop.home.dir", "D:\\Studyingimportant\\hadoop-2.9.2");
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* mapper任务(task)
* @param key 行索引
* @param value 每行数据
* @param context mr中的上下文环境
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//①将每行数据拆分成单词数组
String line = value.toString();
String[] words = line.split(" ");
//②将各个单词映射成<k,v>
for (String word : words) {
//③写到内存缓冲区
context.write(new Text(word), new LongWritable(1));
}
}
}
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
/**
* reducer任务(task)
* @param key 每个单词
* @param values 单词个数的集合
* @param context 上下文环境
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
Long sum = 0L;
//①遍历指定key单词对应的个数集合[1,1,1]
for (LongWritable value : values) {
//②累加个数
sum += value.get();
}
//③输出
context.write(key, new LongWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//0.初始化一个job
Configuration conf = new Configuration();
/*************************2.远程调用方式start******************************/
// conf.set("yarn.resourcemanager.hostname", "hadoop-senior.test.com");
// conf.set("mapreduce.framework.name", "yarn");
// conf.set("yarn.nodemanager.aux-services", "mapreduce_shuffle");
// conf.set("mapreduce.app-submission.cross-platform","true");
// conf.set("fs.defaultFS", "hdfs://hadoop-senior.test.com:8020");
/*************************2.远程调用方式end******************************/
Job job = Job.getInstance(conf, "word-count");
/*************************2.远程调用方式start******************************/
//job.setJar("D:\\IdeaProjects\\hadoop_demo\\target\\hadoop_demo-1.0-SNAPSHOT.jar");
/*************************2.远程调用方式end******************************/
/*************************3.打jar集群方式start******************************/
job.setJarByClass(WordCount.class);
/*************************3.打jar集群方式start******************************/
//1.输入文件
FileInputFormat.addInputPath(job, new Path(args[0]));
//2.map并行计算
job.setMapperClass(MyMapper.class);
//如果map输出的k和v类型 与 reduce输出的k和v类型一致的话,可省略
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(LongWritable.class);
//3.shuffle流程(内部实现)
//4.reduce计算
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//5.输出文件
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6.提交作业(总入口)
boolean result = job.waitForCompletion(true);
System.out.println(result ? 1 : 0);
}
}
标签:缓冲区 iterable targe split 单词 pac main nap inpu
原文地址:https://www.cnblogs.com/whyuan/p/12784276.html