一、简介
为了方便的统计和管理一些共同信息,Spark中定义了两种共享变量——Broadcast(广播变量)和 Accumulator(累加器),可以方便的把一些变量或数据共享给集群各个节点,今天来看看Accumulator。
Accumulator是由Driver端总体维护的,读取当前值也是在Driver端,各个Task在其所在的Executor上也维护了Accumulator变量,但只是局部性累加操作,运行完会到Driver端去合并累加结果。Accumulator有两个性质:
1、只可累加,合并即累加;
2、不改变Spark作业Lazy执行的特点,也即没有action操作触发job的情况下累加器的值有可能是初始值。
二、Accumulator的分类(Spark2.x):
1、Spark自带类型的累加器
(1)LongAccumulator(long类型的累加器累加整型)
(2)DoubleAccumulator(double类型的累加器累加浮点型)
(3)CollectionAccumulator(集合类型累加器累加集合元素)
创建方式如下:
LongAccumulator longAccumulator = sc.sc().longAccumulator("longAccumulator");//其中longAccumulator为该累加器在web UI上的名称
2、自定义累加器 ——累加器类需继承AccumulatorV2抽象类
需要实现其中add()方法、Merge()方法、value()方法等必要和非必要方法;
以下我实现了一个字符串拼接的自定义累加器:
package com.renyang.sparkproject.spark.session; import com.renyang.sparkproject.constant.Constants; import com.renyang.sparkproject.util.StringUtils; import org.apache.spark.util.AccumulatorV2; public class SessionAggrStatAccumulatorV2 extends AccumulatorV2<String, String> { private static final long serialVersionUID = 6311074555136039130L; private String data = "session_count=0|1s_3s=0|4s_6s=3|7s_9s=0|10s_30s=0|30s_60s=0|1m_3m=0|3m_10m=0|10m_30m=0|30m=0|1_3=0|4_6=1|7_9=0|10_30=0|30_60=0|60=0"; private String zero = data; @Override public boolean isZero() { return data.equals(zero); } @Override public AccumulatorV2<String, String> copy() { return new SessionAggrStatAccumulatorV2(); } @Override public void reset() { data = zero; } public void add(String v) { data = add(data, v); } @Override public void merge(AccumulatorV2<String, String> other) { SessionAggrStatAccumulatorV2 o =(SessionAggrStatAccumulatorV2)other; String[] words = data.split("\\|"); String[] owords = o.data.split("\\|"); for (int i = 0; i < words.length; i++) { for (int j = 0; j < owords.length; j++) { if (words[i].split("=")[0].equals(owords[j].split("=")[0])){ int value = Integer.valueOf(words[i].split("=")[1]) +Integer.valueOf(owords[j].split("=")[1]); String ns = StringUtils.setFieldInConcatString(data, "\\|", owords[j].split("=")[0], String.valueOf(value)); //每次合并完,更新str data = ns; } } } } @Override public String value() { return data; } /** * session统计计算逻辑 * @param v1 连接串 * @param v2 范围区间 * @return 更新以后的连接串 */ private String add(String v1, String v2) { // 校验:v1为空的话,直接返回v2 if(StringUtils.isEmpty(v1)) { return v2; } // 使用StringUtils工具类,从v1中,提取v2对应的值,并累加1 String oldValue = StringUtils.getFieldFromConcatString(v1, "\\|", v2); if(oldValue != null) { // 将范围区间原有的值,累加1 int newValue = Integer.valueOf(oldValue) + 1; // 使用StringUtils工具类,将v1中,v2对应的值,设置成新的累加后的值 return StringUtils.setFieldInConcatString(v1, "\\|", v2, String.valueOf(newValue)); } return v1; } }
三、Accumulator的运行逻辑
1、Driver端负责定义和注册累加器
累加器在Driver端被定义并初始化,同时需要注册入SparkContext,这样才能将累加器变量分发到集群各个节点,等到各个Task运行完之后会回收累加器结果进行Driver端合并,这个合并的过程是根据Task执行情况而定,只要有完成的Task就会更新累加器变量。
2、Executor端
Executor接收到Task之后,不但会反序列化RDD和Function,还会反序列化Accumulator,当Executor执行完Task之后,会将结果随同Accumulator一起返回给Driver端。