RDD抽象通过语言集成API公开。这简化了编程的复杂性,因为应用程序的处理RDDS方式类似于操纵的本地集合数据。
Spark Shell
Spark提供了一个交互的shell ? 一个强大的工具,以交互方式分析数据。 这是在 Scala或Python语言。Spark主要抽象称为弹性分布式数据集(RDD)项目的分布式采集。RDDS可以从Hadoop的输入格式来创建(如HDFS文件)或通过转化其他RDDS。
打开 Spark Shell
$ spark-shell
创建简单RDD
scala> val inputfile = sc.textFile(“input.txt”)
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
RDD 转换
RDD转换返回指向新的RDD,并允许创建RDDS之间的依赖关系。 在依赖关系链中的每个RDD(依赖关系的字串)具有这样的功能,用于计算其数据并具有一个指针(依赖性)到其父RDD。
Spark是懒惰的,所以什么都不会被执行,除非调用一些改造或行动将触发作业创建和执行。看单词计数示例,如下面的代码片段。
S.No |
转换&含义
|
---|---|
1 |
map(func) 返回一个新的分布式数据集,传递源的每个元素形成通过一个函数 func |
2 |
filter(func) 返回由选择在func返回true,源元素组成了一个新的数据集
|
3 |
flatMap(func) 类似映射,但每个输入项目可以被映射到0以上输出项目(所以func应返回seq而不是单一的项目)
|
4 |
mapPartitions(func) 类似映射,只不过是单独的每个分区(块)上运行RDD,因此 func 的类型必须是Iterator<T> ? Iterator<U> 对类型T在RDD上运行时 |
5 |
mapPartitionsWithIndex(func) 类似映射分区,而且还提供func 来表示分区的索引的整数值,因此 func 必须是类型 (Int, Iterator<T>) ? Iterator<U> 当类型T在RDD上运行时 |
6 |
sample(withReplacement, fraction, seed) 采样数据的一小部分,有或没有更换,利用给定的随机数发生器的种子
|
7 |
union(otherDataset) 返回一个新的数据集,其中包含源数据和参数元素的结合
|
8 |
intersection(otherDataset) 返回包含在源数据和参数元素的新RDD交集
|
9 |
distinct([numTasks]) 返回一个新的数据集包含源数据集的不同元素
|
10 |
groupByKey([numTasks]) 当调用(K,V)数据集,返回(K, Iterable<V>) 对数据集 |
11 |
reduceByKey(func, [numTasks]) |
12 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
13 |
sortByKey([ascending], [numTasks]) |
14 |
join(otherDataset, [numTasks]) |
15 |
cogroup(otherDataset, [numTasks]) |
16 |
cartesian(otherDataset) 当上调用类型T和U的数据集,返回(T,U)对数据集(所有元素对)
|
17 |
pipe(command, [envVars]) RDD通过shell命令每个分区,例如:一个Perl或bash脚本。RDD元素被写入到进程的标准输入和线路输出,标准输出形式返回一个字符串RDD |
18 |
coalesce(numPartitions) 减少RDD到numPartitions分区的数量。过滤大型数据集后,更高效地运行的操作
|
19 |
repartition(numPartitions) 打乱RDD数据随机创造更多或更少的分区,并在它们之间平衡。这总是打乱的所有数据在网络上
|
20 |
repartitionAndSortWithinPartitions(partitioner) 根据给定的分区重新分区RDD及在每个结果分区,排序键记录。这是调用重新分配排序在每个分区内,因为它可以推动分拣向下进入混洗机制效率更高。 |
动作
S.No | 操作 & 含义 |
---|---|
1 |
reduce(func) 合计数据集的元素,使用函数 func (其中有两个参数和返回一行). 该函数应该是可交换和可结合,以便它可以正确地在并行计算。 |
2 |
collect() 返回数据集的所有作为数组在驱动程序的元素。这是一个过滤器或其它操作之后返回数据的一个足够小的子集,通常是有用的 |
3 |
count() 返回该数据集的元素数
|
4 |
first() 返回的数据集的第一个元素(类似于使用(1))
|
5 |
take(n) 返回与该数据集的前n个元素的阵列。
|
6 |
takeSample (withReplacement,num, [seed]) 返回数组的数据集num个元素,有或没有更换随机抽样,预指定的随机数发生器的种子可选 |
7 |
takeOrdered(n, [ordering]) 返回RDD使用或者按其自然顺序或自定义比较的前第n个元素
|
8 |
saveAsTextFile(path) 写入数据集是一个文本文件中的元素(或一组文本文件),在给定的目录的本地文件系统,HDFS或任何其他的Hadoop支持的文件系统。Spark调用每个元素的 toString,将其转换为文件中的文本行 |
9 |
saveAsSequenceFile(path) (Java and Scala) 写入数据集,为Hadoop SequenceFile元素在给定的路径写入在本地文件系统,HDFS或任何其他Hadoop支持的文件系统。 这是适用于实现Hadoop可写接口RDDS的键 - 值对。在Scala中,它也可以在属于隐式转换为可写(Spark包括转换为基本类型,如 Int, Double, String 等等)类型。 |
10 |
saveAsObjectFile(path) (Java and Scala) 写入数据集的内容使用Java序列化为一个简单的格式,然后可以使用SparkContext.objectFile()加载。 |
11 |
countByKey() 仅适用于RDDS的类型 (K, V). 返回(K, Int)对与每个键的次数的一个HashMap。 |
12 |
foreach(func) 数据集的每个元素上运行函数func。这通常对于不良反应,例如更新累加器或与外部存储系统进行交互进行。 注 ? 在 foreach()以外修改变量,其他累加器可能会导致不确定的行为。请参阅了解闭包的更多细节。 |
示例
考虑一个单词计数的例子 ? 它计算出现在文档中的每个单词。请看下面的文字为输入并保存在主目录中的 input.txt 文件。
input.txt ? 作为输入文件
people are not as beautiful as they look, as they walk or as they talk. they are only as beautiful as they love, as they care as they share.
打开Spark-Shell
下面的命令用来打开spark shell. 通常情况下,spark 使用Scala构建。因此,Spark 程序需要在 Scala 环境中运行。
$ spark-shell
如果Spark shell 成功打开,会发现下面的输出。看看输出“Spark 上下文可作为sc” 的最后一行表示Spark容器会自动创建Spark 上下文对象名为sc。启动程序的第一步骤之前,SparkContext 对象应该被创建。
Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark‘s default log4j profile: org/apache/spark/log4j-defaults.properties 15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 15/06/04 15:25:23 INFO Utils: Successfully started service ‘HTTP class server‘ on port 43292. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ ‘_/ /___/ .__/\_,_/_/ /_/\_\ version 1.4.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) Type in expressions to have them evaluated. Spark context available as sc scala>
创建一个RDD
下面的命令被用于从给定位置读出的文件。这里,新的 RDD 使用输入文件名创建。这是在 textFile(“”)方法的参数字符串是用于输入文件名的绝对路径。然而,如果仅给出文件名,那么它输入文件则在当前位置。
scala> val inputfile = sc.textFile("input.txt")
我们的目标是计算一个文件中的字数。分裂每一行成词创建一个平面地图(flatMap(line ? line.split(“ ”)).
接下来,读每个词作为一个键和值 ‘1’ (<key, value> = <word,1>) 使用映射函数 (map(word ? (word, 1)).
最后,加入类似的键值降低这些键 (reduceByKey(_+_)).
下面的命令用于执行字数统计逻辑。执行此操作后,不会有任何输出,因为这不是一个动作,这是一个转换; 指向一个新的RDD或告诉spark,用给定的数据来做什么)。
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
同时用RDD工作,如果想了解当前的RDD,那么可使用下面的命令。 它会告诉你关于当前RDD及其依赖调试的描述。
scala> counts.toDebugString
可以使用 persist() 或 cache() 方法标记一个RDD。在第一次计算的操作,这将被保存在存储器中的节点上。使用下面的命令来存储中间转换在内存中。
scala> counts.cache()
应用动作(操作),比如存储所有的转换结果到一个文本文件中。saveAsTextFile(“”)方法字符串参数是输出文件夹的绝对路径。试试下面的命令来保存输出文本文件。在下面的例子中, ‘output’ 的文件夹为当前位置。
scala> counts.saveAsTextFile("output")
打开另一个终端进入主目录(其中spark 在其他终端中执行)。下面的命令用于检查输出目录。
[hadoop@localhost ~]$ cd output/ [hadoop@localhost output]$ ls -1 part-00000 part-00001 _SUCCESS
下面的命令是用来查看输出的 Part-00001 文件。
[hadoop@localhost output]$ cat part-00000
输出
(people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)
[hadoop@localhost output]$ cat part-00001
输出
(walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)
http://localhost:4040
Scala> counts.unpersist()
将看到如下输出 ?
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
http://localhost:4040/
Spark部署
示例
让我们同样以计算字数为例子,在使用之前,使用shell命令。 在这里,我们考虑同样 spark 应用程序的例子。
简单输入
下面的文字是输入数据,并命名该文件为 in.txt.
people are not as beautiful as they look, as they walk or as they talk. they are only as beautiful as they love, as they care as they share.
请看下面的程序 ?
SparkWordCount.scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) /* local = master URL; Word Count = application name; */ /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ /* Map = variables to work nodes */ /*creating an inputRDD to read text file (in.txt) through Spark context*/ val input = sc.textFile("in.txt") /* Transform the inputRDD into countRDD */ valcount = input.flatMap(line ? line.split(" ")) .map(word ? (word, 1)) .reduceByKey(_ + _) /* saveAsTextFile method is an action that effects on the RDD */ count.saveAsTextFile("outfile") System.out.println("OK"); } }
保存上述程序到指定的文件 SparkWordCount.scala 并将其放置在一个用户定义的目录名为 spark-application.
注 ? 虽然转化 inputRDD 成 countRDD 我们使用 flatMap() 用于标记化(从文本文件),行成单词, map() 方法统计词频和 reduceByKey() 方法计算每个单词的重复。
使用以下步骤来提交应用程序。通过终端在 spark-application目录中执行所有步骤。
第1步:下载 Spark Jar
Spark需要核心 jar 来编译,因此,从下面的链接下载spark-core_2.10-1.3.0.jar 移动下载 jar 的文件到 spark-application 应用程序目录。
使用下面给出的命令编译上述程序。这个命令应该在spark-application应用程序目录下执行。这里,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar ,Spark 采用了 Hadoop 的 jar 支持程序。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
第3步:创建 JAR
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
使用以下命令提交 spark 应用 ?
spark-submit --class SparkWordCount --master local wordcount.jar
如果成功执行,那么会发现有下面给出的输出。在下面输出的正常用户识别,这是程序的最后一行。如果仔细阅读下面的输出,会发现不同的东西,比如 ?
-
在端口 42954 成功启动服务 “sparkDriver”
-
MemoryStore 启动使用容量267.3 MB
- 启动SparkUI在 http://192.168.1.217:4040
- 添加JAR文件:/home/hadoop/piapplication/count.jar
- ResultStage 1 (saveAsTextFile 在 SparkPi.scala:11) finished in 0.566 s
- 停止 Spark web用户界面在 http://192.168.1.217:4040
- MemoryStore 清理
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 15/07/08 13:56:04 INFO Utils: Successfully started service ‘sparkDriver‘ on port 42954. 15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 15/07/08 13:56:05 INFO Utils: Successfully started service ‘HTTP file server‘ on port 56707. 15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s OK 15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 15/07/08 13:56:14 INFO Utils: Shutdown hook called 15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
$ cd outfile $ ls Part-00000 part-00001 _SUCCESS
part-00000 文件检查输出命令 ?
$ cat part-00000 (people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)
part-00001文件查看输出命令 ?
$ cat part-00001 (walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)
Spark-submit 语法
spark-submit [options] <app jar | python file> [app arguments]
选项
S.No | 选项 | 描述 |
---|---|---|
1 | --master | spark://host:port, mesos://host:port, yarn, 或 local. |
2 | --deploy-mode | 无论是在本地启动驱动程序(client),或在工作人员的机器中的一个集群内 ("cluster") (默认: client) |
3 | --class |
应用程序的主类(适用于 Java/Scala 的应用程序)
|
4 | --name |
应用程序的名称
|
5 | --jars |
以逗号分隔本地 jar 列表包括驱动器和执行者类路径
|
6 | --packages |
逗号分隔 jar 的 Maven 坐标系列表,包括驱动器和执行者类路径
|
7 | --repositories | 逗号分隔额外远程存储库列表搜索Maven给定的坐标,使用 --packages |
8 | --py-files |
用逗号分隔 .zip,.egg 或.py文件的列表放在Python路径中的 Python 应用程序
|
9 | --files |
逗号分隔放置在每一个执行者的工作目录中的文件的列表
|
10 | --conf (prop=val) |
任意 Spark 配置属性
|
11 | --properties-file |
路径从一个文件来加载额外属性。如果没有指定,这将在 conf/spark-defaults 寻找默认值
|
12 | --driver-memory | 存储驱动程序 (e.g. 1000M, 2G) (默认: 512M) |
13 | --driver-java-options |
额外的Java选项传递给驱动程序
|
14 | --driver-library-path |
额外的库路径条目传递给驱动程序
|
15 | --driver-class-path |
额外的类路径条目传递给驱动程序
需要注意的是使用 --jars 添加 jar 会自动包含在类路径中
|
16 | --executor-memory | 每个执行者的内存(e.g. 1000M, 2G) (默认: 1G) |
17 | --proxy-user |
用户在提交申请时模仿
|
18 | --help, -h |
显示此帮助信息并退出
|
19 | --verbose, -v |
打印额外的调试输出
|
20 | --version |
打印当前 Spark 版本
|
21 | --driver-cores NUM |
核心驱动程序(默认值:1)
|
22 | --supervise |
如果给定,重新启动对故障的驱动程序
|
23 | --kill |
如果给定,杀死指定的驱动程序
|
24 | --status |
如果给定,请求指定的驱动程序的状态
|
25 | --total-executor-cores |
为所有执行者的核心总数
|
26 | --executor-cores |
每执行者内核的数量。 (默认值:1是YARN模式,或在独立模式下,工人利用多内核)
|
Spark编程
-
广播变量 ? 采用高效,分发大值
-
累加器 ? 用于聚集特定集合的信息
广播变量允许程序员保持每台机器上一个只读变量缓存,而不是运输它的一个副本任务。它们可用于,例如,给每一个节点,一个大的输入数据集的副本,以有效的方式。Spark 也尝试分发广播变量来使用高效的广播算法来降低通信成本。
Spark 操作通过一组阶段执行,通过分布式“洗牌”作业分开。Spark 会自动广播各阶段任务所需的通用数据。
广播数据缓存到序列化的形式,在运行每个任务之前,反序列化。这意味着显式地创建广播变量,当仅是在多个阶段的任务需要相同的数据或在反序列化形式缓存数据时非常重要的。
广播变量从一个变量v通过调用 SparkContext.broadcast(v)来创建。广播变量是围绕 v 封装,其值可以通过调用值的方法来访问。下面给出的代码显示了这一点 -
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
输出 ?
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
创建广播变量之后,它应该被用来代替任何函数的值 v 的集群上运行, v 不运到节点不止一次。此外,对象 v 不应在它的广播后修饰,以确保所有节点获得广播变量的相同的值。
蓄电池仅是“补充”到通过关联操作变量,因此可以,可以并行有效的支持。它们可以被用来实现计数器(如在MapReduce)或求和。Spark原生支持累加器的数字类型,程序员可以添加支持新类型。如果累加器使用自定义的一个名称创建,它将显示在 Spark 的 UI 中。这对于了解运行阶段和进度很有用(注 - 这还不支持在Python)。
累加器从初始值v的值是通过调用 SparkContext.accumulator(v) 创建. 在集群上运行任务可以使用 add 方法或 += 运算符(在 Scala 和Python)来添加它。 然而无法读取它的值。只有驱动程序可以读取累加器的值,使用 value 方法。
下面给出的代码显示一个累加器,用来相加数组的元素 ?
scala> val accum = sc.accumulator(0) scala> sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)
如果想看到的上面的代码的输出,可以使用下面的命令 ?
scala> accum.value
输出
res2: Int = 10
允许使用预定义的API方法之一做不同数字数据的操作。 Spark 数字运算是与流传输算法,允许构建模型,一次一个元素实现。
S.No | 方法 & 含义 |
---|---|
1 |
count() 在RDD元素的数量
|
2 |
Mean() 在RDD元素的平均值
|
3 |
Sum() 在RDD中元素的总和
|
4 |
Max() 在RDD中所有元素的最大值
|
5 |
Min() 在RDD中所有元素的最小值 |
6 |
Variance() 元素的差异
|
7 |
Stdev() 元素的标准差
|