标签:obj driver 测试环境 clean string org 元素 格式 开发
行动算子:触发运算,在 Executor 执行,如果想直接在 Driver 端看到结果可以使用 collect 和 foreach 都可以将数据拉取到 Driver 端。
scala> val rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24
scala> rdd1.reduce(_+_) res50: Int = 55
scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24
scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2)) res51: (String, Int) = (adca,12)
测试:
scala> val rdd = sc.parallelize(1 to 5) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:24 scala> rdd.reduce(_+_) res17: Int = 15
从 Executor 端拉取数据到 Driver 端操作:一般用在测试环境中。
scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.count res1: Long = 10
scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.first res2: Int = 1
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd.take(3) res10: Array[Int] = Array(2, 5, 4)
返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否随机数
替换不足的部分,seed 用于指定随机数生成器种子。
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd.takeOrdered(3) res18: Array[Int] = Array(2, 3, 4)
scala> var rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
scala> rdd.aggregate(0)(_+_,_+_) res22: Int = 55
scala> var rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
scala> rdd.fold(0)(_+_) res24: Int = 55
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24
scala> rdd.countByKey res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
测试:
scala> sc.parallelize(Array(1,2,1,2,3,3,3)).map((_,1)).countByKey() res19: scala.collection.Map[Int,Long] = Map(1 -> 2, 2 -> 2, 3 -> 3)
在开发中可以搭配 Sample 查看数据是否倾斜
def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
scala> var rdd = sc.makeRDD(1 to 5,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24
scala> rdd.foreach(println(_)) 3 4 5 1 2
测试:
scala> rdd.foreach(println) 2 1 4 5 3
标签:obj driver 测试环境 clean string org 元素 格式 开发
原文地址:https://www.cnblogs.com/LXL616/p/11144946.html