标签:
今天在用Spark把Kafka的数据往ES写的时候,代码一直报错,错误信息如下:
15/10/20 17:28:56 ERROR actor.OneForOneStrategy: org.apache.spark.SparkContext java.io.NotSerializableException: org.apache.spark.SparkContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1138) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:172) at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
代码如下:
val lines = KafkaUtils.createStream(ssc, zookep_address, "aaaa", topicMap).map(_._2) var messageRdd = lines.flatMap(_.split("\n")) messageRdd.foreachRDD(rdd => { val array = rdd.collect() val data12 = array.map(i => { var msg = i.replace(">", ">").replace("<", "<").replace(" ", " ").replace("\"", """).replace("\‘", "‘").replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("[", "[").replace("]", "]").replace("{", "{").replace("}", "}") val json1 = """{"msg" : """" + i + """"}""" println("json:" + json1) json1 }) if (data12 != null && data12.length > 0) { EsSpark.saveJsonToEs(sc.parallelize(data12), "spark/sys_log") println("----------end-------------") } }) ssc.start() ssc.awaitTermination()
在网上找了好久一直没找到解决方案,后来在看官方的example的时候,发现又类似的代码,做了调整解决了次问题:
val lines = KafkaUtils.createStream(ssc, zookep_address, "aaaa", topicMap).map(_._2)
var messageRdd = lines.flatMap(_.split("\n"))
messageRdd.foreachRDD(rdd => {
val array = rdd.collect()
val data12 = array.map(i => {
var msg = i.replace(">", ">").replace("<", "<").replace(" ", " ").replace("\"", """).replace("\‘", "‘").replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("[", "[").replace("]", "]").replace("{", "{").replace("}", "}")
val json1 = """{"msg" : """" + i + """"}"""
println("json:" + json1)
json1
})
if (data12 != null && data12.length > 0) {
EsSpark.saveJsonToEs(rdd.sparkContext.parallelize(data12), "spark/sys_log")
println("----------end-------------")
}
})
ssc.start()
ssc.awaitTermination()
ERROR actor.OneForOneStrategy: org.apache.spark.SparkContext
标签:
原文地址:http://www.cnblogs.com/qq27271609/p/4895503.html