标签:
比如,能够无需改动类的实现而在虚类中加入一个方法(即用默认的实现)。
在新的API中。mapper和reducer如今都是虚类。
新的API 放在org.apache.hadoop.mapreduce 包(和子包)中。之前版本号的API 依然放在org.apache.hadoop.mapred中。
新的API充分使用上下文对象(Context),使用户代码能与MapReduce系统通信。比如,MapContext 基本具备了JobConf、OutputCollector和Reporter的功能。
新的API 同一时候支持"推"(push)和"拉"(pull)式的迭代。
这两类API。均能够将键/值对记录推给mapper。但除此之外,新的API 也同意把记录从map()方法中拉出。
对reducer来说是一样的。"拉"式处理数据的优点是能够实现数据的批量处理,而非逐条记录地处理。
新增的API实现了配置的统一。旧API 通过一个特殊的JobConf 对象配置作业。该对象是Hadoop配置对象的一个扩展。
在新的API 中,我们丢弃这样的区分,全部作业的配置均通过Configuration 来完毕。
新API中作业控制由Job类实现。而非JobClient类,新API中删除了JobClient类。
输出文件的命名方式稍有不同。map的输出文件名称为part-m-nnnnn。而reduce的输出为part-r-nnnnn(当中nnnnn表示分块序号。为整数,且从0開始算。
这里我设置的是1天(60*24)
删除数据rm后,会将数据move到当前文件夹下的.Trash文件夹(一般在HDFS的/user/root文件夹下)
(b)測试
1)新建文件夹input
hadoop/bin/hadoop fs -mkdir input
2)上传文件
root@master:/data/soft# hadoop/bin/hadoop fs -copyFromLocal /data/soft/file0* input
3)删除文件夹input
[root@master data]# hadoop fs -rmr input
Moved to trash: hdfs://master:9000/user/root/input
4)參看当前文件夹(回收文件夹在HDFS的/user/root文件夹下)
[root@master data]# hadoop fs -ls
Found 2 items
drwxr-xr-x - root supergroup 0 2014-08-12 13:21 /user/root/.Trash
发现input删除,多了一个文件夹.Trash
5)恢复刚刚删除的文件夹(注意设置源 和 目的地址)
[root@master data]# hadoop fs -mv /user/root/.Trash/Current/user/root/input /user/root/input
6)检查恢复的数据
[root@master data]# hadoop fs -ls input
Found 2 items
-rw-r--r-- 3 root supergroup 22 2014-08-12 13:21 /user/root/input/file01
-rw-r--r-- 3 root supergroup 28 2014-08-12 13:21 /user/root/input/file02
7)删除.Trash文件夹(清理垃圾)
[root@master data]# hadoop fs -rmr .Trash
Deleted hdfs://master:9000/user/root/.Trash
执行MR作业但在web页面(http://hadoop:50070和http://hadoop:50030)看不到作业执行记录,建议把程序打成jar包。用命令行提交。
为了简化命令行方式执行作业。Hadoop自带了一些辅助类。
//比如例如以下的程序 public class WordCount { // 略... public static void main(String[] args) throws Exception { //新API就是通过Configuration对象进行作业的配置 Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // 略... Job job = new Job(conf, "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); } }这段程序中使用到了GenericOptionsParser这个类,它的作用是将命令行中參数自己主动设置到变量conf中。
通常不直接使用GenericOptionsParser,更方便的方式是:实现Tool接口。通过ToolRunner来执行应用程序。ToolRunner内部调用GenericOptionsParser
改动后的代码变成了这样:
public class WordCount extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { //在run方法中通过getConf()方法获得Configuration对象 //run()方法隐藏了通过辅助类获取输入參数以设置Configuration对象的代码 Job job = new Job(getConf(), "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }(5)在用命令行执行MR作业时。假设出现ClassNotFoundException可能是由于缺少第三方jar包。能够把第三方jar包copy到hadoop安装文件夹下放置jar的那个文件夹。
程序:新API版 package inAction; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //基于新API的WordCount样例(用extends Configured implements Tool的方式便于管理作业) public class MyWordCount extends Configured implements Tool { public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while (st.hasMoreTokens()) { word.set(st.nextToken()); context.write(word, one); } } } public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } result.set(count); context.write(key, result); } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf();//新APIConfiguration对象负责作业配置 //ToolRunner工具会自己主动调用隐藏的GenericOptionsParser将命令行參数设置到conf中 Job job=new Job(conf,"MyWordCount"); job.setJarByClass(MyWordCount.class); job.setMapperClass(MyMapper.class); job.setCombinerClass(MyReducer.class); job.setReducerClass(MyReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //输入输出參数能够在命令行执行时设置。也能够在程序中直接设置,右击run on hadoop FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res=ToolRunner.run(new Configuration(), new MyWordCount(), args); System.exit(res); } }执行:hadoop jar /root/wordCount.jar /usr/input /usr/output
程序:旧API版 package inAction; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; //基于旧API的WordCount实现 public class WordCount2 { public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { StringTokenizer st = new StringTokenizer(value.toString()); while (st.hasMoreTokens()) { word.set(st.nextToken()); output.collect(word, one); } } } public static class MyReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable res = new IntWritable(); @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } res.set(sum); output.collect(key, res); } } public static void main(String[] args) throws IOException { //旧API使用JobConf配置作业 JobConf conf=new JobConf(WordCount2.class); conf.setJobName("OldAPIWordCount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MyMapper.class); conf.setCombinerClass(MyReduce.class); conf.setReducerClass(MyReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path("hdfs://hadoop:9000/usr/wordsIn")); FileOutputFormat.setOutputPath(conf, new Path("hdfs://hadoop:9000/usr/wordsOut2")); JobClient.runJob(conf);//新API中JobClient已删除 } }关于wordCount的详解能够參考这篇文章:http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html
package inAction; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //找出每一年的最高气温(本例仅仅用了1901年和1902年的数据,网上下的) public class MaxTemperature extends Configured implements Tool{ //Mapper的功能是提取每一行原始数据中的年份和温度值 public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private static final int MISSING=9999;//假设一行的气温值是9999即表明该年气温缺失 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String year=line.substring(15,19);//年份是15-19个字符 int temperature; //气温有正负要差别对待,Integer.parseInt不能处理负数 if(line.charAt(87)=='+'){ temperature=Integer.parseInt(line.substring(88,92)); }else{ temperature=Integer.parseInt(line.substring(87,92)); } String quantity=line.substring(92,93);//quantity.matches("[01459]")表明数量仅仅有是01459时才是有效气温值 //仅仅有有效气温值才输出 if(quantity.matches("[01459]")&&temperature!=MISSING){ context.write(new Text(year), new IntWritable(temperature)); } } } public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue=Integer.MIN_VALUE; for(IntWritable temp:values){ maxValue=Math.max(temp.get(), maxValue); } context.write(key, new IntWritable(maxValue)); } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf(); Job job=new Job(conf,"MaxTemperature"); job.setJarByClass(MaxTemperature.class); job.setMapperClass(MyMapper.class); job.setCombinerClass(MyReducer.class);//设置Combiner降低传递给Reducer的数据量,提高性能 job.setReducerClass(MyReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //输入输出參数能够在命令行执行时设置。也能够在程序中直接设置,右击run on hadoop FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res=ToolRunner.run(new Configuration(), new MaxTemperature(), args); System.exit(res); } }关于MaxTemperature的详解能够參考这篇文章:http://www.linuxidc.com/Linux/2012-05/61196.htm
即输出形如:专利号1 引用专利1的专利号,引用专利1的专利号...(本例来自《Hadoop in action》)
package inAction; import java.io.IOException; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.*; public class MyJob2 extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] citation = value.toString().split(","); context.write(new Text(citation[1]), new Text(citation[0])); } } public static class Reduce extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException { String csv = ""; for (Text val : value) { if (csv.length() > 0) csv += ","; csv += val.toString(); } context.write(key, new Text(csv)); } } @Override public int run(String[] arg0) throws Exception { Configuration conf=getConf(); Job job=new Job(conf,"MyJob2"); job.setJarByClass(MyJob2.class); Path in=new Path("/root/cite75_99.txt"); Path out=new Path("/root/inAction3"); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res=ToolRunner.run(new Configuration(), new MyJob2(), args); System.exit(res); } } //旧API程序MyJob.java package inAction; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class MyJob extends Configured implements Tool{ public MyJob() { // TODO Auto-generated constructor stub } public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text>{ @Override public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { // TODO Auto-generated method stub //把专利号和被引用专利号倒过来 output.collect(value, key); } } //把每一个专利的引用专利列在其后面的Reduce public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>{ @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { // TODO Auto-generated method stub String csv=""; while(values.hasNext()){ if(csv.length()>0) csv+=","; csv+=values.next().toString(); } output.collect(key, new Text(csv)); } } //计数功能的Reduce // public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, IntWritable>{ // // @Override // public void reduce(Text key, Iterator<Text> values, // OutputCollector<Text, IntWritable> output, Reporter reporter) // throws IOException { // // TODO Auto-generated method stub // int count=0; // while(values.hasNext()){ // values.next(); // count++; // } // output.collect(key, new IntWritable(count)); // } // } @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub Configuration conf=getConf(); JobConf job=new JobConf(conf,MyJob.class); Path in=new Path("hdfs://hadoop:9000/zpc/cite75_99.txt"); Path out=new Path("hdfs://hadoop:9000/zpc/output1"); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("zpcJob1"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.set("key.value.separator.in.input.line",","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception{ int res=ToolRunner.run(new Configuration(), new MyJob(), args); System.exit(res); } }
//统计不同引用次数的专利数目,比方被引用一次的专利有几个、被引用n次的专利有几个 程序CitationHistogram.java package inAction; import inAction.MyJob.MapClass; import inAction.MyJob.Reduce; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; import org.apache.hadoop.conf.Configured; //统计不同引用次数的专利数目。比方被引用一次的专利有几个,引用两次的专利有几个 public class CitationHistogram extends Configured implements Tool{ public static class MapClass extends MapReduceBase implements Mapper<Text, Text, IntWritable, IntWritable>{ private final static IntWritable uno=new IntWritable(1); private IntWritable citationCount=new IntWritable(); public void map(Text key, Text value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException { // TODO Auto-generated method stub citationCount.set(Integer.parseInt(value.toString())); output.collect(citationCount, uno); } } public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ @Override public void reduce(IntWritable key, Iterator<IntWritable> value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException { // TODO Auto-generated method stub int count=0; while(value.hasNext()){ count+=value.next().get(); } output.collect(key, new IntWritable(count)); } } @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub Configuration conf=getConf(); JobConf job=new JobConf(conf,CitationHistogram.class); Path in=new Path("/root/inAction1-1/part-00000"); Path out=new Path("/root/inAction2"); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("zpcJob2"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception{ int res=ToolRunner.run(new Configuration(), new CitationHistogram(), args); System.exit(res); } }注:以上程序的专利数据集apat63_99.txt和cite75_99.txt能够从网上下载。
要求在输出中每行有两个间隔的数字,当中,第一个代表原始数据在原始数据集中的位次。第二个代表原始数据。
例子输入:
1)file1:
2
32
654
32
15
756
65223
2)file2:
5956
22
650
92
3)file3:
26
54
6
例子输出:
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
程序:Sort.java
对输入文件里数据进行就算学生平均成绩。
输入文件里的每行内容均为一个学生的姓名和他对应的成绩,假设有多门学科,则每门学科为一个文件。
要求在输出中每行有两个间隔的数据。当中,第一个代表学生的姓名,第二个代表其平均成绩。
样本输入:
1)math:
张三 88
李四 99
王五 66
赵六 77
2)china:
张三 78
李四 89
王五 96
赵六 67
3)english:
张三 80
李四 82
王五 84
赵六 86
样本输出:
张三 82
李四 90
王五 82
赵六 76
程序:AverageScore.java
例子输入例如以下所看到的。
1)factory:
factoryname addressed
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Bank of Beijing 1
2)address:
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
例子输出例如以下所看到的
factoryname addressname
Bank of Beijing Beijing
Beijing Red Star Beijing
Beijing Rising Beijing
Guangzhou Development Bank Guangzhou
Guangzhou Honda Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
多表关联和单表关联相似,都类似于数据库中的自然连接。
相比单表关联,多表关联的左右表和连接列更加清楚。
所以能够採用和单表关联的同样的处理方式,map识别出输入的行属于哪个表之后,对其进行切割,将连接的列值保存在key中。
还有一列和左右表标识保存在value中。然后输出。reduce拿到连接结果之后,解析value内容,依据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。
程序:MTjoin.java
版权声明:本文博客原创文章,博客,未经同意,不得转载。
标签:
原文地址:http://www.cnblogs.com/gcczhongduan/p/4640855.html