前言:接上一篇
1.需求描述:识别新老用户
本身客户端业务有新老用户的标识,但是不够准确,需要用实时计算再次确认(不涉及业务操作,只是单纯的做个状态确认)
2.利用侧输出流实现数据拆分
根据日志数据内容,将日志数据分成3类,页面日志、启动日志和曝光日志。页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流。
3.将不同流的数据推送下游的Kafka的不同topic中
代码如下:
1.在MyKafkaUtil中添加【获取Kafka消费者的方法】(读)
注意:此方法是在上一篇的MyKafkaUtil中添加
/** * 获取KafkaSource的方法 * * @param topic 主题 * @param groupId 消费者组 */ public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) { //给配置信息对象添加配置项 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); //获取KafkaSource return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties); } }
2.Flink调用工具类读取数据的主程序
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.lxz.gamll20210909.util.MyKafkaUtil; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; public class BaseLogApp { public static void main(String[] args) throws Exception { //1.获取执行环境,设置并行度,开启CK,设置状态后端(HDFS) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //为Kafka主题的分区数 env.setParallelism(1); //1.1 设置状态后端 // env.setStateBackend(new FsStateBackend("hdfs://hadoop201:8020/gmall/dwd_log/ck")); // //1.2 开启CK // env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setCheckpointTimeout(60000L); //修改用户名 System.setProperty("HADOOP_USER_NAME", "root"); //2.读取Kafka ods_base_log 主题数据 String topic = "ods_base_log"; String groupId = "ods_dwd_base_log_app"; FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId); DataStreamSource<String> kafkaDS = env.addSource(kafkaSource); //3.将每行数据转换为JsonObject // SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSONObject::parseObject); OutputTag<String> dirty = new OutputTag<String>("DirtyData") { }; SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() { @Override public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception { try { JSONObject jsonObject = JSON.parseObject(value); out.collect(jsonObject); } catch (Exception e) { ctx.output(dirty, value); } } }); // //打印测试 // jsonObjDS.print(">>>>>>>>>"); //4.按照Mid分组 // KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(data -> data.getJSONObject("common").getString("mid")); SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlag = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid")) .process(new KeyedProcessFunction<String, JSONObject, JSONObject>() { // 定义状态 private ValueState<String> isNewState; // 初始化状态 @Override public void open(Configuration parameters) throws Exception { isNewState = getRuntimeContext().getState(new ValueStateDescriptor<String>("isNew-state", String.class)); } @Override public void processElement(JSONObject jsonObject, Context ctx, Collector<JSONObject> out) throws Exception { // 取出数据中“is_new”字段 String isNew = jsonObject.getJSONObject("common").getString("is_new"); // 如果isNew为1,则需要继续校验 if ("1".equals(isNew)) { //取出状态中的数据,并判断是否为null if (isNewState.value() != null) { //说明当前mid不是新用户,修改is_new的值 jsonObject.getJSONObject("common").put("is_new", "0"); } else { // 说明为真正的新用户 isNewState.update("0"); } } // 输出数据 out.collect(jsonObject); } }); jsonObjWithNewFlag.print(">>>>>>>>>"); jsonObjDS.getSideOutput(dirty).print("Dirty>>>>>>>>>>"); //执行任务 env.execute(); } }
3.在服务器上开启生产者假造数据,便于一会查看IDEA客户端的数据输出
bin/kafka-console-producer.sh --broker-list hadoop201:9092 --topic ods_base_log
4.启动IDEA主程序并在Kafka生产者中插入数据(最好插入3次,模拟用户二次进入和脏数据进入场景)
{"common":{"ar":"310000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone X","mid":"mid_16","os":"iOS 13.3.1","uid":"12","vc":"v2.1.132"},"start":{"entry":"icon","loading_time":16516,"open_ad_id":15,"open_ad_ms":2419,"open_ad_skip_ms":0},"ts":1631197168000}
5.观察IDEA客户端的数据输出
6.结果可知,第一次is_new=1,数据插入模拟用户第一次访问,当再次插入数据模拟用户第二次访问,则主程序is_new变成0,标识用户此次访问不作为新用户记录,当第三次插入数据并删除“}”,伪造脏数据时,IDEA客户端识别数据时“dirty”并打印输出。