UserView--第一种方式set去重,基于Spark算子的java代码实现


UserView--第一种方式set去重,基于Spark算子的java代码实现UserView--第一种方式set去重,基于Spark算子的java代码实现

UserView--第一种方式set去重,基于Spark算子的java代码实现

测试数据
UserView--第一种方式set去重,基于Spark算子的java代码实现
java代码
package com.hzf.spark.study;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; public class UVAnalysis {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("UV_ANA").setMaster("local")
.set("spark.testing.memory", "2147480000");
@SuppressWarnings("resource")
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("userLog1");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
uvAnalyze(logRDD, broadcast);
} private static void uvAnalyze(JavaRDD<String> logRDD, final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter(new Function<String, Boolean>() { private static final long serialVersionUID = 1L; @Override
public Boolean call(String v1) throws Exception {
String actionParam = broadcast.value();
String action = v1.split("\t")[5];
return actionParam.equals(action);
}
}); JavaPairRDD<String, String> pairLogRDD = filteredLogRDD
.mapToPair(new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<String, String> call(String val) throws Exception {
String pageId = val.split("\t")[3];
String userId = val.split("\t")[2];
return new Tuple2<String, String>(pageId, userId);
}
}); pairLogRDD.groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() {
private static final long serialVersionUID = 1L; @Override
public void call(Tuple2<String, Iterable<String>> tuple) throws Exception {
String pageId = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
Set<String> userSets = new HashSet<>();
while (iterator.hasNext()) {
String userId = iterator.next();
userSets.add(userId);
}
System.out.println("PAGEID:" + pageId + "\t UV_COUNT:" + userSets.size());
}
});
}
}

  

result
UserView--第一种方式set去重,基于Spark算子的java代码实现
 
 
上一篇:Linux时间子系统之(五):POSIX Clock


下一篇:如何用阿里云ECS搭建云上博客?