目录
一、需求分析
新闻网站需求:
- pv
- uv
- 注册用户数
- 热门板块
数据处理流程:
数据源 -> kafka -> spark streaming
二、数据准备
(1)数据格式
网站日志格式 :
date,timestamp,userid,pageid,section,action
日志字段说明:
date: 日期,yyyy-MM-dd格式
timestamp: 时间戳
userid: 用户id
pageid: 页面id
section: 版块
action: 用户行为,两类,点击页面和注册
数据展示:
2020-12-20 1608451521565 364 422 fashion view
2020-12-20 1608451521565 38682 708 aviation view
2020-12-20 1608451521565 65444 270 internet view
2020-12-20 1608451521565 4805 250 tv-show view
2020-12-20 1608451521565 1130 743 movie view
2020-12-20 1608451521565 85320 605 carton view
2020-12-20 1608451521565 null 581 movie view
2020-12-20 1608451521565 null null null register
kafka消费者启动:
bin/kafka-console-consumer.sh --bootstrap-server bigdata-pro-m04:9092 --topic spark
(2)基于Java开发实时数据生成器
这里生成的实时数据流,生成的传递给kafka,每隔1秒随机生成1000条数据。
package com.kfk.spark.news_analysis_project;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
/**
* 访问日志Kafka Producer
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/12
* @time : 7:51 下午
*/
public class AccessProducer extends Thread{
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
private static String date;
// 版块内容
private static String[] sections = new String[] {"country", "international", "sport",
"entertainment", "movie", "carton",
"tv-show", "technology", "internet",
"car", "military", "funny",
"fashion", "aviation", "government"};
private static Random random = new Random();
private static int[] newOldUserArr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
private Producer<String,String> producer;
private String topic;
/**
* 构造函数
* @param topic
*/
public AccessProducer(String topic){
this.topic = topic;
producer = new KafkaProducer<String, String>(createProducerConfig());
date = simpleDateFormat.format(new Date());
}
/**
* createProducerConfig
* @return
*/
public Properties createProducerConfig(){
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", "bigdata-pro-m04:9092");
return properties;
}
@Override
public void run(){
int counter = 0;
while (true){
// 生成1000条访问数据
for (int i = 0;i < 1000;i++){
String log = null;
// 生成条访问数据
if (newOldUserArr[random.nextInt(10)] == 1){
log = getAccessLog();
} else {
log = getRegisterLog();
}
// 将数据发送给kafka
producer.send(new ProducerRecord<String, String>(topic,log));
counter++;
if (counter == 100){
counter = 0;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
/**
* 生成注册数据
* @return
*/
private static String getRegisterLog(){
StringBuffer stringBuffer = new StringBuffer("");
// 生成时间戳
long timestamp = System.currentTimeMillis();
// 随机生成userid(默认1000注册用户,每天1/10的访客是未注册用户)
Long userid = 0L;
int newOldUser = newOldUserArr[random.nextInt(10)];
if (newOldUser == 1){
userid = null;
} else {
userid = (long)random.nextInt(100000);
}
// 随机生成pageid,共1000个页面
long pageid = random.nextInt(1000);
// 随机生成板块
String section = sections[random.nextInt(sections.length)];
// 生成固定的行为,view
String action = "view";
return stringBuffer.append(date).append(" ")
.append(timestamp).append(" ")
.append(userid).append(" ")
.append(pageid).append(" ")
.append(section).append(" ")
.append(action).toString();
}
/**
* 生成访问数据
* @return
*/
private static String getAccessLog(){
StringBuffer stringBuffer = new StringBuffer("");
// 生成时间戳
long timestamp = System.currentTimeMillis();
// 新用户都是userid为null
Long userid = null;
// 生成随机pageid,都是null
Long pageid = null;
// 生成随机版块,都是null
String section = null;
// 生成固定的行为,view
String action = "register";
return stringBuffer.append(date).append(" ")
.append(timestamp).append(" ")
.append(userid).append(" ")
.append(pageid).append(" ")
.append(section).append(" ")
.append(action).toString();
}
public static void main(String[] args) {
AccessProducer accessProducer = new AccessProducer("spark");
accessProducer.start();
}
}
三、实施过程
将每一个需求写入到一个方法中,运行流程分析及结果以注释的形式显示。
关于数据模型,可以参考基于Spark SQL对新闻网站项目案例分析这篇文章
https://blog.csdn.net/weixin_45366499/article/details/111119234
package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/14
* @time : 8:23 下午
*/
public class CommStreamingContext {
public static JavaStreamingContext getJssc(){
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
return new JavaStreamingContext(conf, Durations.seconds(5));
}
}
package com.kfk.spark.news_analysis_project;
import com.kfk.spark.common.CommStreamingContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.*;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/20
* @time : 4:11 下午
*/
public class NewsRealTime {
/**
* input data:
* 2020-12-20 1608451521565 364 422 fashion view
* 2020-12-20 1608451521565 38682 708 aviation view
* ...
* @param args
*/
public static void main(String[] args) throws InterruptedException {
JavaStreamingContext jssc = CommStreamingContext.getJssc();
// sparkstreaming与kafka连接
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "bigdata-pro-m04:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "streaming_kafka_1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
// 设置topic
Collection<String> topics = Collections.singletonList("spark");
// kafka数据源
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
/**
* stream -> map -> JavaDStream
*/
JavaDStream<String> accessDstream = stream.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> v1) throws Exception {
return v1.value();
}
});
/**
* accessDStream -> filter -> action(view)
*/
JavaDStream<String> filterDstream = accessDstream.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
String[] lines = v1.split(" ");
String action = lines[5];
String actionValue = "view";
if (actionValue.equals(action)){
return true;
} else {
return false;
}
}
});
// 求网页的pv
calculatePagePV(filterDstream);
// 求网页的uv
calculatePageUV(filterDstream);
// 求注册用户数
calculateRegistercount(accessDstream);
// 求热门板块
calculateUserSectionPV(accessDstream);
jssc.start();
jssc.awaitTermination();
}
/**
* 求网页的pv
* input data:
* 2020-12-20 1608451521565 364 422 fashion view
*
* 数据演化过程:
* filterDstream -> mapToPair -> <2020-12-20_422,1> -> reduceByKey -> <2020-12-20_422,5>
*
* @param filterDstream
*/
public static void calculatePagePV(JavaDStream<String> filterDstream){
/**
* filterDstream -> mapToPair -> <2020-12-20_422,1>
*/
JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String,Integer> call(String lines) throws Exception {
String[] line = lines.split(" ");
return new Tuple2<String,Integer>(line[0] + "_" + line[3], 1);
}
});
/**
* pairDstream -> reduceByKey -> <2020-12-20_422,5>
*/
JavaPairDStream<String,Integer> pvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
pvStream.print();
/**
* (2020-12-21_16,1)
* (2020-12-21_548,1)
* (2020-12-21_881,1)
* (2020-12-21_27,1)
* (2020-12-21_771,1)
* (2020-12-21_344,2)
* (2020-12-21_313,1)
* (2020-12-21_89,1)
* (2020-12-21_14,1)
* (2020-12-21_366,1)
* ...
*/
}
/**
* 求网页的uv
* input data:
* 2020-12-20 1608451521565 364 422 fashion view
* 2020-12-20 1608451521565 364 422 fashion view
* 2020-12-20 1608451521565 365 422 fashion view
* 2020-12-20 1608451521565 366 422 fashion view
* 2020-12-20 1608451521565 367 422 fashion view
* 2020-12-20 1608451521565 367 453 fashion view
*
* 数据演化过程:
* 第一步:map
* (2020-12-20,364,422)
* (2020-12-20,364,422)
* (2020-12-20,365,422)
* (2020-12-20,366,422)
* (2020-12-20,367,422)
* (2020-12-20,367,453)
*
* 第二步:rdd -> distinct
* (2020-12-20,364,422)
* (2020-12-20,365,422)
* (2020-12-20,366,422)
* (2020-12-20,367,422)
* (2020-12-20,367,453)
*
* 第三步:mapToPair
* <2020-12-20_422,1>
* <2020-12-20_422,1>
* <2020-12-20_422,1>
* <2020-12-20_422,1>
* <2020-12-20_453,1>
*
* 第四步:reduceByKey
* <2020-12-20_422,4>
* <2020-12-20_453,1>
*
* @param filterDstream
*/
public static void calculatePageUV(JavaDStream<String> filterDstream){
/**
* filterDstream -> map -> (2020-12-20,364,422)
*/
JavaDStream<String> mapDstream = filterDstream.map(new Function<String, String>() {
@Override
public String call(String lines) throws Exception {
String[] line = lines.split(" ");
return line[0] + "," + line[2] + "," + line[3];
}
});
/**
* mapDstream -> distinct
*/
JavaDStream<String> distinctDstream = mapDstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<String> lines) throws Exception {
return lines.distinct();
}
});
/**
* distinctDstream -> mapToPair -> <2020-12-20_422,1>
*/
JavaPairDStream<String,Integer> pairDstream = distinctDstream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String lines) throws Exception {
String[] line = lines.split(",");
return new Tuple2<>(line[0] + "_" + line[2], 1);
}
});
/**
* pairDstream -> reduceByKey -> <2020-12-20_422,4>
*/
JavaPairDStream<String,Integer> uvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
uvStream.print();
/**
* (2020-12-21_492,1)
* (2020-12-21_85,2)
* (2020-12-21_18,1)
* (2020-12-21_27,2)
* (2020-12-21_825,1)
* (2020-12-21_366,1)
* (2020-12-21_89,1)
* (2020-12-21_14,2)
* (2020-12-21_69,1)
* (2020-12-21_188,1)
* ...
*/
}
/**
* 求注册用户数:过滤出action=register的数据就可以
* input data:
* 2020-12-20 1608451521565 364 422 fashion view
*
* 数据演化过程:
* accessDStream -> filter -> action(register) -> mapToPair -> reduceByKey
*
* @param accessDstream
*/
public static void calculateRegistercount(JavaDStream<String> accessDstream){
/**
* accessDStream -> filter -> action(register)
*/
JavaDStream<String> filterDstream = accessDstream.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
String[] lines = v1.split(" ");
String action = lines[5];
String actionValue = "register";
if (actionValue.equals(action)){
return true;
} else {
return false;
}
}
});
/**
* filterDstream -> mapToPair -> <2020-12-20_register,1>
*/
JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String,Integer> call(String lines) throws Exception {
String[] line = lines.split(" ");
return new Tuple2<String,Integer>(line[0] + "_" + line[5], 1);
}
});
/**
* pairDstream -> reduceByKey -> <2020-12-20_register,5>
*/
JavaPairDStream<String,Integer> registerCountStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
registerCountStream.print();
/**
* (2020-12-21_register,11)
*/
}
/**
* 求出热门板块
* input data:
* 2020-12-20 1608451521565 364 422 fashion view
*
* 数据演化过程:
* filterDstream -> mapToPair -> <2020-12-20_fashion,1> -> reduceByKey -> <2020-12-20_fashion,5>
*
* @param filterDstream
*/
public static void calculateUserSectionPV(JavaDStream<String> filterDstream){
/**
* filterDstream -> mapToPair -> <2020-12-20_fashion,1>
*/
JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String,Integer> call(String lines) throws Exception {
String[] line = lines.split(" ");
return new Tuple2<String,Integer>(line[0] + "_" + line[4], 1);
}
});
/**
* pairDstream -> reduceByKey -> <2020-12-20_fashion,5>
*/
JavaPairDStream<String,Integer> pvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
pvStream.print();
/**
* (2020-12-21_internet,16)
* (2020-12-21_military,24)
* (2020-12-21_aviation,21)
* (2020-12-21_carton,19)
* (2020-12-21_government,25)
* (2020-12-21_tv-show,19)
* (2020-12-21_country,14)
* (2020-12-21_movie,13)
* (2020-12-21_international,16)
* ...
*/
}
}
这里没有做过多的业务分析,可以根据前面所学的知识进行扩展,将算子灵活运用组合起来!
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!