码迷,mamicode.com
首页 > 移动开发 > 详细

Spark mapPartitions 及mapPartitionsWithIndex算子

时间:2018-10-28 12:50:32      阅读:303      评论:0      收藏:0      [点我收藏+]

标签:连接   apach   map   partition   art   array   比较   元素   初始化   

mapPartitions

 与map类似,map函数是应用到每个元素,而mapPartitions的输入函数是每个分区的数据,把每个分区中的内容作为整体来处理的。 当map里面有比较耗时的初始化操作时,比如连接db,可以采用mapPartitions,它对每个partition操作一次,其函数的输入与输出都是iterator类型。

 实例如下:

scala> val rdd1=sc.parallelize(1 to 9,3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={
     | var res=List[(T,T)]()
     | var pre=iter.next
     | while (iter.hasNext) {
     | val cur=iter.next
     | res.::=(pre,cur)
     | pre=cur
     | }
     | res.iterator
     | }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
scala> rdd1.mapPartitions(myfunc)
res2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at mapPartitions at <console>:28
scala> res2.collect()
res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

 

mapPartitionsWithIndex

与 mapPartitions 类似,参数需多传一个分区的index.

实例如下: 

scala> val mapReslut=rdd1.mapPartitionsWithIndex{
     | (index,iterator)=>{
     | val list=iterator.toList
     | list.map(x=>x +"->"+index).iterator
     | }
     | }
mapReslut: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at mapPartitionsWithIndex at <console>:25
scala> mapReslut.collect
res6: Array[String] = Array(1->0, 2->0, 3->0, 4->1, 5->1, 6->1, 7->2, 8->2, 9->2) 

Spark mapPartitions 及mapPartitionsWithIndex算子

标签:连接   apach   map   partition   art   array   比较   元素   初始化   

原文地址:https://www.cnblogs.com/abcdwxc/p/9865071.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!