标签:
MapReduce理论
Hadoop Map/Reduce是一个使用简单的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。
一个Map/Reduce作业(job)通常会把输入的数据集切分为若干独立的数据块,由map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序,然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的的任务。
通常,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效的利用。
Map/Reduce框架由一个单独的master JobTracker和每个集群节点一个slave TaskTracker共同组成。Master负责调度构成一个作业的所有的任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。
应用程序至少应该指明输入/输出的位置,并通过实现合适的接口或抽象类提供map和reduce函数。再加上其它作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的job client提交作业(jar包/可执行程序)等和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。
虽然Hadoop框架是javaTM实现的,但Map/Reduce应用程序则不一定要用java来写。
? Hadoop Streaming是一种运行作业的使用工具,它允许用户创建和运行任务可执行程序(例如:Shell)来作为mapper和reducer。
? Hadoop Pipes是一个与SWIG兼容的C++ API(没有基于JNITM技术),它也可用于实现Map/Reduce应用程序
Map/Reduce框架运转在<key, value>键值对上,也就是说,框架把作业的输入看为是一组<key, value>键值对,同样也产出一组<key, value>键值对作为作业的输出,这2组键值对的类型可能不同。
框架需要对key和value的类进行序列化操作,因此这些类需要实现Writable接口。另外,为了方便框架执行排序操作,key类必须实现WritableComparable接口。
一个Map/Reduce作业的输入和输出类型如下所示:
(input)<k1,v1>->map-><k2,v2>->combine-><k2,v2>->reduce-><k3,v3>(output)
在深入细节之前,让我们先看一个Map/Reduce的应用示例,以便对它们的工作方式有一个初步的认识。WordCount是一个简单的应用,它可以计算出指定数据集中每一个单词出现的次数。这个应用适用于单机模式,伪分布式模式或完全分布式模式三种Hadoop安装方式。
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount{
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text,IntWritable>output,Reporter reporter) throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens())
{
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>
{
public void reduce(Text key,Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException
{
int sum = 0;
while(values.hasNext())
{
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception
{
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
假设环境变量HADOOP_HOME对应安装时的根目录,HADOOP_VERSION对应Hadoop的当前安装版本,编译WordCount.java来创建jar包,可如下操作:
$ mkdir wordcount_classes
$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d wordcount_classes WordCount.java
$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .
假设:
/usr/joe/wordcount/input —— 是HDFS中的输入路径
/usr/joe/wordcount/output—— 是HDFS中的输出路径
用示例文本文件作为输入:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World Bye World
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
运行应用程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input/ /us/joe/wordcount/output/
输出是:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
GoodBye 1
Hadoop 1
Hello 2
World 2
应用程序能够使用-files选项来指定一个由逗号分隔的路径列表,这些路径是task的当前工作目录。使用选项-libjars可以向map和reduce的classpath中添加jar包。使用-archives选项程序可以传递档案文件作为参数,这些档案文件会被解压并且在task的当前工作目录下会创建一个指向解压生成的目录的符号链接(以压缩包的名字命名)。
使用-libjars和-files运行wordcount例子:
$ hadoop jar hadoop-examples.jar wordcount –files cachefile.txt –libjars mylib.jar input output
待续。
标签:
原文地址:http://www.cnblogs.com/liwei0526vip/p/5637890.html