ElectricFenceTask
package pers.aishuang.flink.streaming.task;
import com.mysql.jdbc.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import pers.aishuang.flink.streaming.entity.ElectricFenceModel;
import pers.aishuang.flink.streaming.entity.ElectricFenceResultTmp;
import pers.aishuang.flink.streaming.entity.ItcastDataObj;
import pers.aishuang.flink.streaming.function.window.ElectricFenceModelFunction;
import pers.aishuang.flink.streaming.function.flatmap.ElectricFenceRulesFunction;
import pers.aishuang.flink.streaming.function.window.ElectricFenceWindowFunction;
import pers.aishuang.flink.streaming.sink.mysql.ElectricFenceMysqlSink;
import pers.aishuang.flink.streaming.source.mysql.MySQLElectricFenceSource;
import pers.aishuang.flink.streaming.source.mysql.MysqlElectricFenceResultSource;
import pers.aishuang.flink.streaming.utils.JsonParseUtil;
import java.util.HashMap;
/**
* 电子围栏,用于判断实时车辆上报数据和电子围栏的规则静态数据进行关联,
* 分析车辆在指定的电子栅栏的信息和进入栅栏和出栅栏的时间,并将分析结果入库
* 分析步骤:
* 1、电子围栏分析任务设置、原始数据json解析、过滤异常数据
* 2、读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)
* 3、原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream<ElectricFenceModel>)
* 4、创建90s滚动窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)
* 5、读取电子围栏分析结果表数据并广播
* 6、滚动窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect
* 7、对电子围栏对象模型,添加uuid和inMysql(车辆是否已存在musql表中)
* 8、电子围栏分析结果数据落地mysql
* ===最终在数据库中的结果是一条被记录在册的车辆出入电子围栏信息
*/
public class ElectricFenceTask extends BaseTask{
public static void main(String[] args) {
//1、电子围栏分析任务设置、原始数据json解析、过滤异常数据
StreamExecutionEnvironment env = getEnv(ElectricFenceTask.class.getSimpleName());
DataStreamSource<String> kafkaStream = getKafkaStream(
env,
"__consumer_electricFence_",
SimpleStringSchema.class);
//json解析:原始json数据 --> ItcastDataObj对象
//异常数据过滤:无vin,无终端时间,数据格式不是完整json格式,满足其一都是异常数据,而异常数据在解析时,errorData字段有数据。
DataStream<ItcastDataObj> source = kafkaStream.map(JsonParseUtil::parseJsonToObject)
.filter(obj -> StringUtils.isNullOrEmpty(obj.getErrorData()));
//2、读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)
DataStream<HashMap<String, ElectricFenceResultTmp>> broadcastStream = env
.addSource(new MySQLElectricFenceSource()).broadcast();
//3、原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream<ElectricFenceModel>)
//Javabean对象 <---> HashMap集合(每个元素组成是 String:Javabean对象)
DataStream<ElectricFenceModel> electricFenceWidthStream = source.connect(broadcastStream)
.flatMap(new ElectricFenceRulesFunction());
//4、创建90s滚动窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)
SingleOutputStreamOperator<ElectricFenceModel> electricFenceWindowStream = electricFenceWidthStream
//指定哪个成员是事件事件和添加水印(允许乱序时间3秒)
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<ElectricFenceModel>(Time.seconds(3)) {
@Override
public long extractTimestamp(ElectricFenceModel element) {
return element.getTerminalTimestamp();
}
}
)
//指定分组字段
.keyBy(obj -> obj.getVin())
//指定窗口类型
.window(TumblingEventTimeWindows.of(Time.seconds(90)))
//应用窗口函数(使用了MapState状态保存)
//-- 经过这个窗口后 ,一个窗口输出数据,要么没有,要么只有一条进栅栏数据,要是只有一条出栅栏数据
.apply(new ElectricFenceWindowFunction());
//5、读取电子围栏分析结果表数据并广播 (每1s来一条结果数据)
//HashMap里有多条数据,每个vin只有一条数据
DataStream<HashMap<String, Integer>> electricFenceResultStream = env
.addSource(new MysqlElectricFenceResultSource()).broadcast();
//6、滚动窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect
electricFenceWindowStream.connect(electricFenceResultStream)
//7、对电子围栏对象模型,添加uuid和inMysql(车辆是否已存在musql表中)
.flatMap(new ElectricFenceModelFunction())
//8、电子围栏分析结果数据落地mysql,也可以选择落地mong0
.addSink(new ElectricFenceMysqlSink());
//9. 触发执行
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}