标签:bin 测试 sub 使用 操作 中间 use 概念 .text
部署节点操作系统为CentOS,防火墙和SElinux禁用,创建了一个shiyanlou用户并在系统根目录下创建/app目录,用于存放 Hadoop等组件运行包。因为该目录用于安装hadoop等组件程序,用户对shiyanlou必须赋予rwx权限(一般做法是root用户在根目录下 创建/app目录,并修改该目录拥有者为shiyanlou(chown –R shiyanlou:shiyanlou /app)。
Hadoop搭建环境:
2 MapReduce原理
2.1 MapReduce简介
MapReduce 是现今一个非常流行的分布式计算框架,它被设计用于并行计算海量数据。第一个提出该技术框架的是Google 公司,而Google 的灵感则来自于函数式编程语言,如LISP,Scheme,ML 等。MapReduce 框架的核心步骤主要分两部分:Map 和Reduce。当你向MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map 任务,然后分配到不同的节点上去执行,每一个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个Map 的输出汇总到一起并输出。从高层抽象来看,MapReduce的数据流图如下图所示:
2.2 MapReduce流程分析
2.2.1 Map过程
当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combia操作,目的有两个:
将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和 其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏 观信息。只要reduce任务向JobTracker获取对应的map输出位置就可以了。
2.2.2 Reduce过程
2.3 MapReduce工作机制剖析
3 测试例子1
3.1 测试例子1内容
下载气象数据集部分数据,写一个Map-Reduce作业,求每年的最低温度
3.2 运行代码
3.2.1 MinTemperature
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MinTemperature {
public static void main(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("Usage: MinTemperature<input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MinTemperature.class);
job.setJobName("Min temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MinTemperatureMapper.class);
job.setReducerClass(MinTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.2.2 MinTemperatureMapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING = 9999;
@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87) == ‘+‘) {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
3.2.3 MinTemperatureReducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int minValue = Integer.MAX_VALUE;
for(IntWritable value : values) {
minValue = Math.min(minValue, value.get());
}
context.write(key, new IntWritable(minValue));
}
}
3.3 实现过程
3.3.1 编写代码
进入/app/hadoop-1.1.2/myclass目录,在该目录中建立MinTemperature.java、MinTemperatureMapper.java和MinTemperatureReducer.java代码文件,执行命令如下:
MinTemperature.java:
MinTemperatureMapper.java:
MinTemperatureReducer.java:
3.3.2 编译代码
在/app/hadoop-1.1.2/myclass目录中,使用如下命令对java代码进行编译,为保证编译成功,加入classpath变量,引入hadoop-core-1.1.2.jar包:
3.3.3 打包编译文件
把编译好class文件打包,否则在执行过程会发生错误。把打好的包移动到上级目录并删除编译好的class文件:
3.3.4 解压气象数据并上传到HDFS中
NCDC气象数据下载地址:
http://labfile.oss.aliyuncs.com/courses/237/temperature.zip
把NCDC气象数据解压,并使用zcat命令把这些数据文件解压并合并到一个temperature.txt文件中
气象数据具体的下载地址为 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,该数据包括1900年到现在所有年份的气象数据,大小大概有70多个G,为了测试简单,我们这里选取一部分的数据进行测试。合并后把这个文件上传到 HDFS文件系统的/class5/in目录中:
3.3.5 运行程序
以jar的方式启动MapReduce任务,执行输出目录为/class5/out:
3.3.6 查看结果
执行成功后,查看/class5/out目录中是否存在运行结果,使用cat查看结果(温度需要除以10):
3.3.7 通过页面结果(由于实验楼环境是命令行界面,以下仅为说明运行过程和结果可以通过界面进行查看)
1.查看jobtracker.jsp
查看已经完成的作业任务:
任务的详细信息:
2.查看dfshealth.jsp
分别查看HDFS文件系统和日志
4 测试例子2
4.1 测试例子2内容
如果求温度的平均值,能使用combiner吗?有没有变通的方法?
4.2 回答
不能直接使用,因为求平均值和前面求最值存在差异,各局部最值的最值还是等于整体的最值的,但是对于平均值而言,各局部平均值的平均值将不再是整体 的平均值了,所以不能直接用combiner。可以通过变通的办法使用combiner来计算平均值,即在combiner的键值对中不直接存储最后的平 均值,而是存储所有值的和个数,最后在reducer输出时再用和除以个数得到平均值。
4.3 程序代码
4.3.1 AvgTemperature.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AvgTemperature {
public static void main(String[] args) throws Exception {
if(args.length != 2) {
System.out.println("Usage: AvgTemperatrue <input path><output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(AvgTemperature.class);
job.setJobName("Avg Temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(AvgTemperatureMapper.class);
job.setCombinerClass(AvgTemperatureCombiner.class);
job.setReducerClass(AvgTemperatureReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4.3.2 AvgTemperatureMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {
private static final int MISSING = 9999;
@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87) == ‘+‘) {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && !quality.matches("[01459]")) {
context.write(new Text(year), new Text(String.valueOf(airTemperature)));
}
}
}
4.3.3 AvgTemperatureCombiner.java
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{
@Overridepublic void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sumValue = 0;
long numValue = 0;
for(Text value : values) {
sumValue += Double.parseDouble(value.toString());
numValue ++;
}
context.write(key, new Text(String.valueOf(sumValue) + ‘,‘ + String.valueOf(numValue)));
}
}
**4.3.4 AvgTemperatureReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{
@Overridepublic void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sumValue = 0;
long numValue = 0;
int avgValue = 0;
for(Text value : values) {
String[] valueAll = value.toString().split(",");
sumValue += Double.parseDouble(valueAll[0]);
numValue += Integer.parseInt(valueAll[1]);
}
avgValue = (int)(sumValue/numValue);
context.write(key, new IntWritable(avgValue));
}
}
4.4 实现过程
4.4.1 编写代码
进入/app/hadoop-1.1.2/myclass目录,在该目录中建立AvgTemperature.java、 AvgTemperatureMapper.java、AvgTemperatureCombiner.java和 AvgTemperatureReducer.java代码文件,代码内容为4.3所示,执行命令如下:
AvgTemperature.java:
AvgTemperatureMapper.java:
AvgTemperatureCombiner.java:
AvgTemperatureReducer.java:
4.4.2 编译代码
在/app/hadoop-1.1.2/myclass目录中,使用如下命令对java代码进行编译,为保证编译成功,加入classpath变量,引入hadoop-core-1.1.2.jar包:
4.4.3 打包编译文件
把编译好class文件打包,否则在执行过程会发生错误。把打好的包移动到上级目录并删除编译好的class文件:
4.4.4 运行程序
数据使用作业2求每年最低温度的气象数据,数据在HDFS位置为/class5/in/temperature.txt,以jar的方式启动MapReduce任务,执行输出目录为/class5/out2:
4.4.5 查看结果
执行成功后,查看/class5/out2目录中是否存在运行结果,使用cat查看结果(温度需要除以10):
标签:bin 测试 sub 使用 操作 中间 use 概念 .text
原文地址:http://www.cnblogs.com/charlist/p/7063961.html