码迷,mamicode.com
首页 > Web开发 > 详细

ERROR actor.OneForOneStrategy: org.apache.spark.SparkContext

时间:2015-10-20 19:20:24      阅读:382      评论:0      收藏:0      [点我收藏+]

标签:

今天在用Spark把Kafka的数据往ES写的时候,代码一直报错,错误信息如下:

15/10/20 17:28:56 ERROR actor.OneForOneStrategy: org.apache.spark.SparkContext
java.io.NotSerializableException: org.apache.spark.SparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1138)
at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:172)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

  

代码如下:

val lines = KafkaUtils.createStream(ssc, zookep_address, "aaaa", topicMap).map(_._2)

    var messageRdd = lines.flatMap(_.split("\n"))

    messageRdd.foreachRDD(rdd => {

      val array = rdd.collect()

      val data12 = array.map(i => {
        var msg = i.replace(">", ">").replace("<", "<").replace(" ", " ").replace("\"", """).replace("\‘", "‘").replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("[", "&#91").replace("]", "&#93").replace("{", "{").replace("}", "}")
        val json1 = """{"msg" : """" + i + """"}"""
        println("json:" + json1)
        json1
      })

      if (data12 != null && data12.length > 0) {
        EsSpark.saveJsonToEs(sc.parallelize(data12), "spark/sys_log")
        println("----------end-------------")
      }
    })
    ssc.start()
    ssc.awaitTermination()

 

在网上找了好久一直没找到解决方案,后来在看官方的example的时候,发现又类似的代码,做了调整解决了次问题:

 val lines = KafkaUtils.createStream(ssc, zookep_address, "aaaa", topicMap).map(_._2)

    var messageRdd = lines.flatMap(_.split("\n"))

    messageRdd.foreachRDD(rdd => {

      val array = rdd.collect()

      val data12 = array.map(i => {
        var msg = i.replace(">", ">").replace("<", "<").replace(" ", " ").replace("\"", """).replace("\‘", "‘").replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("[", "&#91").replace("]", "&#93").replace("{", "{").replace("}", "}")
        val json1 = """{"msg" : """" + i + """"}"""
        println("json:" + json1)
        json1
      })

      if (data12 != null && data12.length > 0) {
        EsSpark.saveJsonToEs(rdd.sparkContext.parallelize(data12), "spark/sys_log")
        println("----------end-------------")
      }
    })
    ssc.start()
    ssc.awaitTermination()

  

 

ERROR actor.OneForOneStrategy: org.apache.spark.SparkContext

标签:

原文地址:http://www.cnblogs.com/qq27271609/p/4895503.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!