Flink流处理-Task之TripDriveTask

Task之TripDriveTask

package pers.aishuang.flink.streaming.task;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
import pers.aishuang.flink.streaming.entity.ItcastDataObj;
import pers.aishuang.flink.streaming.entity.TripModel;
import pers.aishuang.flink.streaming.function.window.DriveSampleWindowFunction;
import pers.aishuang.flink.streaming.function.window.DriveTripWindowFunction;
import pers.aishuang.flink.streaming.sink.hbase.TripDriveToHBaseSink;
import pers.aishuang.flink.streaming.sink.hbase.TripSampleToHBaseSink;
import pers.aishuang.flink.streaming.utils.JsonParseUtil;

/**

  • 驾驶行程采样分析 驾驶行程分析

  • 开发步骤:

  • 1、创建流执行环境

  • 2、获取Kafka中的数据

  • 3、将json字符串解析成车辆数据对象

  • 4、过滤出正确的数据并且是行程数据 chargeStatus=2 或者 chargeStatus=3

  • 0x01:停车充电 0x02:行驶充电 0x03:未充电状态 0x04:充电完成 0xFE:异常 0xFF:无效

  • 5、分配水印机制,设置最大延迟时间 30s

  • 6、超出3分钟的数据,保存到侧输出流,分析一下数据为什么会延迟

  • 7、对车辆数据进行分组,创建会话窗口

  • 8、数据的采样分析

  • -- 应用窗口,数据的采样分析

  • -- 将分析的采样数据封装成数组,并将其保存到HBase中

  • 9、数据的行程分析

  • -- 应用窗口数据,分析低速、中速、高速车辆的soc、行驶里程、油耗、速度、速度切换的次数等数据封装成对象

  • -- 将这个对象保存到HBase中

  • 10、执行流环境任务
    */
    public class TripDriveTask extends BaseTask{
    public static void main(String[] args) {
    //1、创建流执行环境(已设置好checkpoint、重启策略)
    StreamExecutionEnvironment env = getEnv(TripDriveTask.class.getSimpleName());
    //2、获取Kafka中的数据
    DataStreamSource kafkaStream = getKafkaStream(env, "_tripDrive_consumer", SimpleStringSchema.class);
    //3、将json字符串解析成车辆数据对象
    DataStream tripDriveStream = kafkaStream.map(JsonParseUtil::parseJsonToObject)
    //4、 过滤出正确的数据并且是行程数据 chargeStatus=2或者chargeStatus=3
    .filter(obj -> StringUtils.isEmpty(obj.getErrorData()))
    .filter(obj -> (obj.getChargeStatus()2) || (obj.getChargeStatus()3));
    //5、分配水印机制,设置最大延迟时间30s
    SingleOutputStreamOperator itcastDataObjWatermark = tripDriveStream
    //分配水印机制,并指定事件时间字段
    .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(30)) {
    @Override
    public long extractTimestamp(ItcastDataObj element) {
    return element.getTerminalTimeStamp();
    }
    }
    );
    //6、超过3分钟的数据,保存到侧输出流,分析一下数据为什么会延迟
    OutputTag maxLatestData = new OutputTag<>("maxLatestData", TypeInformation.of(ItcastDataObj.class));
    //7、对车辆数据进行分组,创建会话窗口。
    WindowedStream<ItcastDataObj, String, TimeWindow> itcastDataObjWindowStream = itcastDataObjWatermark
    //指定分组字段
    .keyBy(obj -> obj.getVin())
    //指定窗口类型为会话窗口,时间间隔是15min
    .window(EventTimeSessionWindows.withGap(Time.minutes(15L)))
    //允许延迟时间
    .allowedLateness(Time.minutes(3L))
    //侧边流输出延迟数据
    .sideOutputLateData(maxLatestData);
    //8、数据的采样分析
    //-- 应用窗口,数据的采样分析
    SingleOutputStreamOperator<String[]> sampleTripDriveStream = itcastDataObjWindowStream
    .apply(new DriveSampleWindowFunction());
    //-- 将分析的采样数据封装成数组,并将其保存到HBase中
    sampleTripDriveStream.addSink(new TripSampleToHBaseSink("TRIPDB:trip_sample"));
    //9、数据的行程分析
    //-- 应用窗口数据,分析低速、中速、高速车辆的soc、行驶里程、油耗、速度、速度切换的次数等数据封装成对象
    SingleOutputStreamOperator tripModelStream = itcastDataObjWindowStream
    .apply(new DriveTripWindowFunction());
    //-- 将这个对象保存到Hbase中
    tripModelStream.addSink(new TripDriveToHBaseSink("TRIPDB:trip_division"));
    //10、执行流环境任务
    try {
    env.execute();
    } catch (Exception e) {
    e.printStackTrace();
    }

    }
    }

上一篇:大数据Hadoop之——Spark Streaming原理


下一篇:Flink流处理-Task之ElectricFenceTask