码迷,mamicode.com
首页 > Web开发 > 详细

spark 写入 redis 和 org.apache.spark.SparkException: Task not serializable

时间:2015-06-23 15:15:13      阅读:105      评论:0      收藏:0      [点我收藏+]

标签:

spark将数据写入redis时调用以下代码会报  org.apache.spark.SparkException: Task not serializable

import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = rdd.map(x => {
    val arr = x.split(" ")
    val k = arr(0).toInt
    val v = arr(1).toInt
    r.rpush(k, v)
    (k, v)
 })


原因是:在spark,rdd的方法里比如这里的map,方法里的数据会被序列化,并且分发到executors 去执行。这就需要rdd方法里的所有元素是可被序列化的这里的redis连接是不可被序列化的,所以会报Task not serializable异常

解决这个问题的方法是在executors中创建连接对象,这里介绍两种方法

1)rdd.mapPartitions 这个方法允许一次处理整个partitons的数据,在此方法中创建连接:

 val rdd = rdd.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) 
    val res = partition.map{ x =>
        ...
        val refStr = r.rpush(...) 
    }
    r.close 
    res
}

2)用可序列化的单例模式来管理连接,让连接用lazy的方式创建

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}


val rdd = rdd.map{x => 
    ... ...
    val refStr = RedisConnection.conn.rpush(...) 
}

这里主要是给出在处理rdd数据时,获得redis连接的方法,同样的,操作其他数据库道理是一样的,这里是以redis为例

 

spark 写入 redis 和 org.apache.spark.SparkException: Task not serializable

标签:

原文地址:http://www.cnblogs.com/smartcuning/p/4595395.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!