最近项目遇到报错序列化相关问题,于是把这三个拿出来分析一下,先来看下foreachRDD、foreachPartition和foreach的不同之处。
不同主要在于它们的作用范围不同,foreachRDD作用于DStream中每一个时间间隔的RDD,foreachPartition作用于每一个时间间隔的RDD中的每一个partition,foreach作用于每一个时间间隔的RDD中的每一个元素。
而在执行上也有区分,foreachRdd运行在driver节点,foreachPartition和foreach运行在work节点,这点需要注意,也就是说,如果在worker上用driver上的东西,就有可能报序列化错误问题,例如foreachRDD向外部系统输出数据时,通常要创建一个连接对象,如果像上面的代码中创建在driver上就是错误的,因为foreach在每个节点上执行时节点上并没有连接对象。通常会报序列化错误或者初始化错误。
dstream.foreachRDD { rdd =>
val connection = createNewConnection()
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
正确代码应为
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
所以driver与worker之间通信必须要经过序列化,然后并不是所用对象都能被序列化,所以报序列化的问题基本都是这个问题。