标签:循环 context jar包 配置 hadoop prope 任务 本地 pac
package it.dawn.YARNPra.wc_hdfs; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author Dawn * @date 2019年5月1日23:09:08 * @version 1.0 * * 思路? * wordcount单词计数(数据来源hdfs上) * <单词,1> * * 数据传输 * * KEYIN:数据的起始偏移量0~10 11~20 21~30 * VALUEIN:数据 * * KEYOUT:mapper输出到reduce阶段 k的类型 * VALUEOUT:mapper输出到reduce阶段v的类型 * <hello,1><hunter,1><henshuai,1> */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //key 起始偏移量 value 数据 context 上下文 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.读取数据 String line=value.toString(); //2.切割 hello hunter String[] words=line.split(" "); //3.循环的写到下一个阶段<hello,1><hunter,1> for(String word: words) { //4.输出到reducer阶段 context.write(new Text(word), new IntWritable(1)); } } }
package it.dawn.YARNPra.wc_hdfs; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author Dawn * @date 2019年5月1日23:15:33 * @version 1.0 * * 汇总 <hello,4> <hunter,1> <henshuai,2> */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text k3, Iterable<IntWritable> v3, Context context) throws IOException, InterruptedException { //1.统计单词出现的次数 int sum=0; //2 累加求和 for(IntWritable v :v3) { //拿到累加值 sum+=v.get(); } //3 输出结果 context.write(k3, new IntWritable(sum)); } }
package it.dawn.YARNPra.wc_hdfs; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import it.dawn.YARNPra.wc_hdfs.WordCountMapper; import it.dawn.YARNPra.wc_hdfs.WordCountReducer; /** * @author Dawn * @date 2019年5月2日14:28:27 * @version 1.0 * 输入和输出路径都是hdfs上的路径 */ public class WordCountDriver_hdfs { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.获取job信息 Configuration conf=new Configuration(); Job job=Job.getInstance(); //2.获取jar包 job.setJarByClass(WordCountDriver_hdfs.class); //3.获取自定义的mapper与reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4.设置map输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5.设置reduce输出的数据类型(最终的数据类型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6.设置输入存在的路径与处理后的结果路径(注意包的导入 是org.apache.hadoop.mapreduce.lib.下的包) FileInputFormat.setInputPaths(job, new Path("/dawn/wordcount.txt")); FileOutputFormat.setOutputPath(job, new Path("/output/wc")); //7.提交任务 boolean rs=job.waitForCompletion(true); System.out.println(rs?0:1); } }
命令 (如果jar包没在当前目录下,记得写好路径):
hadoop jar wordcount.jar it.dawn.YARNPra.wc_hdfs.WordCountDriver_hdfs
(1)用户自定义mapper类 要继承父类Mapper
(2)Mapper的输入数据的kv对形式(kv类型可以自定义)
(3)Mapper的map方法的重写(加入业务逻辑)
(4)Mapper的数据输出kv对的形式(kv类型可以自定义)
(5)map()方法(maptask进程)对每个<k,v>调用一次
(1)用户自定义reducer类 要继承父类Reducer
(2)Reducer的数据输入类型对应的是Mapper阶段的输出数据类型,也是kv对
(3)Reducer的reduce方法的重写(加入业务逻辑)
(4)ReduceTask进程对每组的k的<k,v>组调用一次reduce方法
MR程序需要一个Driver来进行任务的提交,提交的任务是一个描述了各种重要信息的job对象
=============================================================================
指定MR程序运行容器或者框架 默认是本地模式
<property>
<name>mapreduce.framework.name</name>
<value>local</value>
<description>The runtime framework for executing MapReduce jobs.
Can be one of local, classic or yarn.
</description>
</property>
修改如下:
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
分发到bigdata13 bigdata12
scp mapred-site.xml bigdata12:$PWD
scp mapred-site.xml bigdata13:$PWD
标签:循环 context jar包 配置 hadoop prope 任务 本地 pac
原文地址:https://www.cnblogs.com/hidamowang/p/10802445.html