今天在用Spark把Kafka的数据往ES写的时候,代码一直报错,错误信息如下:
15/10/20 17:28:56 ERROR actor.OneForOneStrategy: org.apache.spark.SparkContext
java.io.NotSerializableException: org.apache.spark.SparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1138)
at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:172)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
代码如下:
val lines = KafkaUtils.createStream(ssc, zookep_address, "aaaa", topicMap).map(_._2) var messageRdd = lines.flatMap(_.split("\n")) messageRdd.foreachRDD(rdd => { val array = rdd.collect() val data12 = array.map(i => {
var msg = i.replace(">", ">").replace("<", "<").replace(" ", " ").replace("\"", """).replace("\'", "'").replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("[", "[").replace("]", "]").replace("{", "{").replace("}", "}")
val json1 = """{"msg" : """" + i + """"}"""
println("json:" + json1)
json1
}) if (data12 != null && data12.length > 0) {
EsSpark.saveJsonToEs(sc.parallelize(data12), "spark/sys_log")
println("----------end-------------")
}
})
ssc.start()
ssc.awaitTermination()
在网上找了好久一直没找到解决方案,后来在看官方的example的时候,发现又类似的代码,做了调整解决了次问题:
val lines = KafkaUtils.createStream(ssc, zookep_address, "aaaa", topicMap).map(_._2) var messageRdd = lines.flatMap(_.split("\n")) messageRdd.foreachRDD(rdd => { val array = rdd.collect() val data12 = array.map(i => {
var msg = i.replace(">", ">").replace("<", "<").replace(" ", " ").replace("\"", """).replace("\'", "'").replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("[", "[").replace("]", "]").replace("{", "{").replace("}", "}")
val json1 = """{"msg" : """" + i + """"}"""
println("json:" + json1)
json1
}) if (data12 != null && data12.length > 0) {
EsSpark.saveJsonToEs(rdd.sparkContext.parallelize(data12), "spark/sys_log")
println("----------end-------------")
}
})
ssc.start()
ssc.awaitTermination()