我想诊断一些错误.我相信我不应该告诉整个情况来为我的问题找到一个好的解决方案.因此,我想在辅助进程上创建一些调试信息,并在驱动程序上实时显示.
我读到某个地方,在工作程序上发出System.out.println(“ DEBUG:…”)会在执行程序日志中产生一行,但目前在检索这些日志时遇到问题.除此之外,如果我在计算运行时看到驱动程序上有一些调试噪音,那将仍然有用.
(我也想出了一种解决方法,但是我不知道是否应该应用它.在每个工作程序任务的结尾,我都可以将元素追加到序列文件中,并且可以对其进行监视,或者在最后进行检查. )
解决方法:
我想到的一种方法是(ab)使用自定义累加器将消息从工作程序发送到驱动程序.这将从工作人员获得任何String消息给驱动程序.在驱动程序上,您将打印内容以收集信息.它不是实时的,因为它取决于程序的执行.
import org.apache.spark.AccumulatorParam
object LineCummulatorParam extends AccumulatorParam[String] {
def zero(value:String) : String = value
def addInPlace(s1:String, s2:String):String = s1 + "\n" + s2
}
val debugInfo = sparkContext.accumulator("","debug info")(DebugInfoCummulatorParam)
rdd.map{rdd => ...
...
...
//this happens on each worker
debugInfo += "something happened here"
}
//this happens on the driver
println(debugInfo)
不知道为什么您不能访问工作日志-这将是最直接的解决方案BTW.