标签:
第一代 Hadoop MapReduce 是一个在计算机集群上分布式处理海量数据集的软件框架,包括一个 JobTracker 和一定数量的 TaskTracker。运行流程图如图 1 所示:
在最上层有 4 个独立的实体,即客户端、jobtracker、tasktracker 和分布式文件系统。客户端提交 MapReduce 作业;jobtracker 协调作业的运行;jobtracker 是一个 Java 应用程序,它的主类是 JobTracker;tasktracker 运行作业划分后的任务,tasktracker 也是一个 Java 应用程序,它的主类是 TaskTracker。Hadoop 运行 MapReduce 作业的步骤主要包括提交作业、初始化作业、分配任务、执行任务、更新进度和状态、完成作业等
6 个步骤。
在导入数据时,模型以 block 形式存在于 worker 上,由 driver 向 worker 分发任务,处理完后 work 向 driver 反馈结果。也可在 work 上对数据模型建立高速缓存 cache,对 cache 的处理过程与 block 类似,也是一个分发、反馈的过程。
Spark 的 RDD 概念能够取得和专有系统同样的性能,还能提供包括容错处理、滞后节点处理等这些专有系统缺乏的特性。
1、迭代算法:这是目前专有系统实现的非常普遍的一种应用场景,比如迭代计算可以用于图处理和机器学习。RDD 能够很好地实现这些模型,包括 Pregel、HaLoop 和 GraphLab 等模型。
2、关系型查询:对于 MapReduce 来说非常重要的需求就是运行 SQL 查询,包括长期运行、数小时的批处理作业和交互式的查询。然而对于 MapReduce 而言,对比并行数据库进行交互式查询,有其内在的缺点,比如由于其容错的模型而导致速度很慢。利用 RDD 模型,可以通过实现许多通用的数据库引擎特性,从而获得很好的性能。
3、MapReduce 批处理:RDD 提供的接口是 MapReduce 的超集,所以 RDD 能够有效地运行利用 MapReduce 实现的应用程序,另外 RDD 还适合更加抽象的基于 DAG 的应用程序。
4、流式处理:目前的流式系统也只提供了有限的容错处理,需要消耗系统非常大的拷贝代码或者非常长的容错时间。特别是在目前的系统中,基本都是基于连续计算的模型,常住的有状态的操作会处理到达的每一条记录。为了恢复失败的节点,它们需要为每一个操作复制两份操作,或者将上游的数据进行代价较大的操作重放,利用 RDD 实现离散数据流,可以克服上述问题。离散数据流将流式计算当作一系列的短小而确定的批处理操作,而不是常驻的有状态的操作,将两个离散流之间的状态保存在 RDD 中。离散流模型能够允许通过 RDD 的继承关系图进行并行性的恢复而不需要进行数据拷贝。
public class LineLengthCountMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable> { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { context.write(new IntWritable(line.getLength()), new IntWritable(1)); } }上述所示代码,由于 Mappers 和 Reducers 只处理键值对,所以对于类 LineLengthCountMapper 而言,输入是 TextInputFormat 对象,它的 key 由行数提供,value 就是该行所有字符。换成 Spark 之后的代码如下:
lines.map(line => (line.length, 1))在 Spark 里,输入是弹性分布式数据集 (Resilient Distributed Dataset),Spark 不需要 key-value 键值对,代之的是 Scala 元祖 (tuple),它是通过 (line.length, 1) 这样的 (a,b) 语法创建的。以上代码中 map() 操作是一个 RDD,(line.length, 1) 元祖。当一个 RDD 包含元祖时,它依赖于其他方法,例如 reduceByKey(),该方法对于重新生成 MapReduce 特性具有重要意义。
public class LineLengthReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { @Override protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(length, new IntWritable(sum)); } }Spark 里面的对应代码如下:
val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)Spark 的 RDD API 有一个 reduce() 方法,它会 reduce 所有的 key-value 键值对到一个独立的 value。
public class CountUppercaseMapper extends Mapper<LongWritable,Text,Text,IntWritable> { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { for (String word : line.toString().split(" ")) { if (Character.isUpperCase(word.charAt(0))) { context.write(new Text(word), new IntWritable(1)); } } } }在 Spark 里面,对应的代码如下:
lines.flatMap( _.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1)) )MapReduce 依赖的 Map 方法这里并不适用,因为每一个输入必须对应一个输出,这样的话,每一行可能占用到很多的输出。相反的,Spark 里面的 Map 方法比较简单。Spark 里面的方法是,首先对每一行数据进行汇总后存入一个输出结果物数组,这个数组可能是空的,也可能包含了很多的值,最终这个数组会作为一个 RDD 作为输出物。这就是 flatMap() 方法的功能,它对每一行文本里的单词转换成函数内部的元组后进行了过滤。
public class CountUppercaseReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text word, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum)); } }在 Spark 里,代码如下:
groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }groupByKey() 方法负责收集一个 key 的所有值,不应用于一个 reduce 方法。本例中,key 被转换成大写字母,值被直接相加算出总和。但这里需要注意,如果一个 key 与很多个 value 相关联,可能会出现 Out Of Memory 错误。
reduceByKey(_ + _).map { case (word,total) => (word.toUpperCase,total) }setup() 方法在 MapReduce 里面主要的作用是在 map 方法开始前对输入进行处理,常用的场景是连接数据库,可以在 cleanup() 方法中释放在 setup() 方法里面占用的资源。
public class SetupCleanupMapper extends Mapper<LongWritable,Text,Text,IntWritable> { private Connection dbConnection; @Override protected void setup(Context context) { dbConnection = ...; } ... @Override protected void cleanup(Context context) { dbConnection.close(); } }在 Spark 里面没有这样的方法!
标签:
原文地址:http://blog.csdn.net/lw_ghy/article/details/51470008