标签:
这里,我以指定executor-memory参数的方式,启动spark-shell。
启动hadoop集群
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ jps
8457 Jps
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh
启动spark集群
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g
在命令行中,我指定了spark-shell运行时暂时用的每个机器上executor的内存大小为1GB。
从HDFS上读取该文件
scala> val rdd1 = sc.textFile("/README.md")
或
scala> val rdd1 = sc.textFile("hdfs:SparkSingleNode:9000/README.md")
返回,MapPartitionsRDD
使用,toDebugString,可以查看其lineage的关系。
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
scala> rdd1.toDebugString
16/09/26 22:47:01 INFO mapred.FileInputFormat: Total input paths to process : 1
res0: String =
(2) MapPartitionsRDD[1] at textFile at <console>:21 []
| /README.md HadoopRDD[0] at textFile at <console>:21 []
scala>
可以看出,MapPartitionsRDD是HadoopRDD转换而来的。
hadoopFile,这个方法,产生HadoopRDD
map,这个方法,产生MapPartitionsRDD
从源码分析过程
scala> val result = rdd1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
le>:23, took 15.095588 s
result: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (DataFram...
scala>
不可这样使用toDebugString
scala> result.toDebugString
<console>:26: error: value toDebugString is not a member of Array[(String, Int)]
result.toDebugString
scala> val wordcount = rdd1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[10] at reduceByKey at <console>:23
scala> wordcount.toDebugString
res3: String =
(2) ShuffledRDD[10] at reduceByKey at <console>:23 []
+-(2) MapPartitionsRDD[9] at map at <console>:23 []
| MapPartitionsRDD[8] at flatMap at <console>:23 []
| MapPartitionsRDD[1] at textFile at <console>:21 []
| /README.md HadoopRDD[0] at textFile at <console>:21 []
scala>
或者
疑问:为什么没有MappedRDD?难道是版本问题??
Spark API综合实战:动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战
标签:
原文地址:http://www.cnblogs.com/zlslch/p/5911131.html