标签:.text spark 相同 read tac not 调用 flow pre
1.StackOverflowError
问题:简单代码记录 :
for (day <- days){
rdd = rdd.union(sc.textFile(/path/to/day) .... )
}
大概场景就是我想把数量比较多的文件合并成一个大rdd,从而导致了栈溢出;
解决:很明显是方法递归调用太多,我之后改成了几个小任务进行了合并;这里union也可能会造成最终rdd分区数过多
2.java.io.FileNotFoundException: /tmp/spark-90507c1d-e98 ..... temp_shuffle_98deadd9-f7c3-4a12(No such file or directory) 类似这种
报错:Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 76.0 failed 4 times, most recent failure: Lost task 0.3 in stage 76.0 (TID 341, 10.5.0.90): java.io.FileNotFoundException: /tmp/spark-90507c1d-e983-422d-9e01-74ff0a5a2806/executor-360151d5-6b83-4e3e-a0c6-6ddc955cb16c/blockmgr-bca2bde9-212f-4219-af8b-ef0415d60bfa/26/temp_shuffle_98deadd9-f7c3-4a12-9a30-7749f097b5c8 (No such file or directory)
场景:大概代码和上面差不多:
for (day <- days){
rdd = rdd.union(sc.textFile(/path/to/day) .... )
}
rdd.map( ... )
解决:简单的map都会报错,怀疑是临时文件过多;查看一下rdd.partitions.length 果然有4k多个;基本思路就是减少分区数
可以在union的时候就进行重分区:
for (day <- days){
rdd = rdd.union(sc.textFile(/path/to/day,numPartitions) .... )
} //这里因为默认哈希分区,并且分区数相同;所有最终union的rdd的分区数不会增多,贴一下源码以防说错
/** Build the union of a list of RDDs. */ def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope { val partitioners = rdds.flatMap(_.partitioner).toSet if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) { /*这里如果rdd的分区函数都相同则会构建一个PartitionerAwareUnionRDD:m RDDs with p partitions each * will be unified to a single RDD with p partitions*/ new PartitionerAwareUnionRDD(this, rdds) } else { new UnionRDD(this, rdds) } }
或者最后在重分区
for (day <- days){
rdd = rdd.union(sc.textFile(/path/to/day) .... )
}
rdd.repartition(numPartitions)
标签:.text spark 相同 read tac not 调用 flow pre
原文地址:http://www.cnblogs.com/arachis/p/Spark_prog.html