Task之TripDriveTask
package pers.aishuang.flink.streaming.task;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
import pers.aishuang.flink.streaming.entity.ItcastDataObj;
import pers.aishuang.flink.streaming.entity.TripModel;
import pers.aishuang.flink.streaming.function.window.DriveSampleWindowFunction;
import pers.aishuang.flink.streaming.function.window.DriveTripWindowFunction;
import pers.aishuang.flink.streaming.sink.hbase.TripDriveToHBaseSink;
import pers.aishuang.flink.streaming.sink.hbase.TripSampleToHBaseSink;
import pers.aishuang.flink.streaming.utils.JsonParseUtil;
/**
-
驾驶行程采样分析 驾驶行程分析
-
开发步骤:
-
1、创建流执行环境
-
2、获取Kafka中的数据
-
3、将json字符串解析成车辆数据对象
-
4、过滤出正确的数据并且是行程数据 chargeStatus=2 或者 chargeStatus=3
-
0x01:停车充电 0x02:行驶充电 0x03:未充电状态 0x04:充电完成 0xFE:异常 0xFF:无效
-
5、分配水印机制,设置最大延迟时间 30s
-
6、超出3分钟的数据,保存到侧输出流,分析一下数据为什么会延迟
-
7、对车辆数据进行分组,创建会话窗口
-
8、数据的采样分析
-
-- 应用窗口,数据的采样分析
-
-- 将分析的采样数据封装成数组,并将其保存到HBase中
-
9、数据的行程分析
-
-- 应用窗口数据,分析低速、中速、高速车辆的soc、行驶里程、油耗、速度、速度切换的次数等数据封装成对象
-
-- 将这个对象保存到HBase中
-
10、执行流环境任务
*/
public class TripDriveTask extends BaseTask{
public static void main(String[] args) {
//1、创建流执行环境(已设置好checkpoint、重启策略)
StreamExecutionEnvironment env = getEnv(TripDriveTask.class.getSimpleName());
//2、获取Kafka中的数据
DataStreamSourcekafkaStream = getKafkaStream(env, "_tripDrive_consumer", SimpleStringSchema.class);
//3、将json字符串解析成车辆数据对象
DataStreamtripDriveStream = kafkaStream.map(JsonParseUtil::parseJsonToObject)
//4、 过滤出正确的数据并且是行程数据 chargeStatus=2或者chargeStatus=3
.filter(obj -> StringUtils.isEmpty(obj.getErrorData()))
.filter(obj -> (obj.getChargeStatus()2) || (obj.getChargeStatus()3));
//5、分配水印机制,设置最大延迟时间30s
SingleOutputStreamOperatoritcastDataObjWatermark = tripDriveStream
//分配水印机制,并指定事件时间字段
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(30)) {
@Override
public long extractTimestamp(ItcastDataObj element) {
return element.getTerminalTimeStamp();
}
}
);
//6、超过3分钟的数据,保存到侧输出流,分析一下数据为什么会延迟
OutputTagmaxLatestData = new OutputTag<>("maxLatestData", TypeInformation.of(ItcastDataObj.class));
//7、对车辆数据进行分组,创建会话窗口。
WindowedStream<ItcastDataObj, String, TimeWindow> itcastDataObjWindowStream = itcastDataObjWatermark
//指定分组字段
.keyBy(obj -> obj.getVin())
//指定窗口类型为会话窗口,时间间隔是15min
.window(EventTimeSessionWindows.withGap(Time.minutes(15L)))
//允许延迟时间
.allowedLateness(Time.minutes(3L))
//侧边流输出延迟数据
.sideOutputLateData(maxLatestData);
//8、数据的采样分析
//-- 应用窗口,数据的采样分析
SingleOutputStreamOperator<String[]> sampleTripDriveStream = itcastDataObjWindowStream
.apply(new DriveSampleWindowFunction());
//-- 将分析的采样数据封装成数组,并将其保存到HBase中
sampleTripDriveStream.addSink(new TripSampleToHBaseSink("TRIPDB:trip_sample"));
//9、数据的行程分析
//-- 应用窗口数据,分析低速、中速、高速车辆的soc、行驶里程、油耗、速度、速度切换的次数等数据封装成对象
SingleOutputStreamOperatortripModelStream = itcastDataObjWindowStream
.apply(new DriveTripWindowFunction());
//-- 将这个对象保存到Hbase中
tripModelStream.addSink(new TripDriveToHBaseSink("TRIPDB:trip_division"));
//10、执行流环境任务
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}}
}