Spark2.0 自定义累加器
在2.0中使用自定义累加器需要继承AccumulatorV2这个抽象类,同时必须对以下6个方法进行实现:
1.reset 方法: 将累加器进行重置;
abstract defreset(): Unit
Resets this accumulator, which is zero value.
2.add 方法: 向累加器中添加另一个值;
abstract defadd(v: IN): Unit
3.merge方法: 合并另一个类型相同的累加器;
abstract defmerge(other: AccumulatorV2[IN, OUT]): Unit
Merges another same-type accumulator into this one and update its state, i.e.
4.value 取值
abstract defvalue: OUT
Defines the current value of this accumulator
5.复制:Creates a new copy of this accumulator.
abstract defcopy(): AccumulatorV2[IN, OUT]
6.
abstract defisZero: Boolean
Returns if this accumulator is zero value or not.
需要注意的是,对累加器的更新只有在action中生效,spark对累加器的每个task的更新只会应用一次,即重新启动的任务不会更新累加器的值.而在transform中需要注意,每个任务可能会多次进行更新,如果task或者job被重复执行.同时累加器不会改变spark的lazy策略.
由于业务需求经常要构造若干Dataframe间数据的映射关系,而使用collectionAccumulator又要有一定量的重复性的Map操作, 故写了这个生成Map的自定义累加器,IN为代表key和value的String 类型的tuple,最后生成Map, 如果累加器中已经含有了要添加的key且 key->value不重复则以字符串||对value进行分隔,并更新累加器的值;
代码如下:
/**
* Created by Namhwik on 2016/12/27.
*/
class MapAccumulator extends AccumulatorV2[(String,String),mutable.Map[String, String]] {
private val mapAccumulator = mutable.Map[String,String]()
def add(keyAndValue:((String,String))): Unit ={
val key = keyAndValue._1
val value = keyAndValue._2
if (!mapAccumulator.contains(key))
mapAccumulator += key->value
else if(mapAccumulator.get(key).get!=value) {
mapAccumulator += key->(mapAccumulator.get(key).get+"||"+value)
}
}
def isZero: Boolean = {
mapAccumulator.isEmpty
}
def copy(): AccumulatorV2[((String,String)),mutable.Map[String, String]] ={
val newMapAccumulator = new MapAccumulator()
mapAccumulator.foreach(x=>newMapAccumulator.add(x))
newMapAccumulator
}
def value: mutable.Map[String,String] = {
mapAccumulator
}
def merge(other:AccumulatorV2[((String,String)),mutable.Map[String, String]]) = other match
{
case map:MapAccumulator => {
other.value.foreach(x =>
if (!this.value.contains(x._1))
this.add(x)
else
x._2.split("\\|\\|").foreach(
y => {
if (!this.value.get(x._1).get.split("\\|\\|").contains(y))
this.add(x._1, y)
}
)
)
}
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
def reset(): Unit ={
mapAccumulator.clear()
}
}
参考 <http://spark.apache.org/docs/latest/programming-guide.html>
ps:使用的时候需要register.