Flink流处理-Source之Mysql

MysqlEletricFenceResultSource

package pers.aishuang.flink.streaming.source.mysql;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;


/**
 * 读取Mysql中电子围栏相关表结合成后的规则
 */
public class MysqlElectricFenceResultSource extends RichSourceFunction {
    //新建日志打印器
    private static final Logger logger = LoggerFactory.getLogger(MysqlElectricFenceResultSource.class);
    //定义JDBC变量
    private static Connection conn = null;
    private static PreparedStatement pstmt = null;

    //设置标识用于记录当前循环读取mysql配置
    private static Boolean flag = true;
    //定义获取配置文件参数工具
    private static ParameterTool parameterTool = null;
    private static Map<String, String> parasMap = null;
    private static ParameterTool globalJobParameters = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //方式一:通过ParameterTool自己再获取配置文件参数
        parameterTool = ParameterTool.fromPropertiesFile(MysqlElectricFenceResultSource.class
                .getClassLoader()
                .getResourceAsStream("conf.properties"));
        //-- 获取Driver、url、user、password
        String driver = parameterTool.getRequired("jdbc.driver");
        String url = parameterTool.getRequired("jdbc.url");
        String user = parameterTool.getRequired("jdbc.user");
        String password = parameterTool.getRequired("jdbc.password");
        //方式二:通过执行环境设置的全局任务参数里获取 参数
        parasMap = getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters()
                .toMap();
        String driver2 = parasMap.get("jdbc.driver");
        String url2 = parasMap.get("jdbc.url");
        String user2 = parasMap.get("jdbc.user");
        String password2 = parasMap.get("jdbc.password");
        //方式三:与方式二本质上一样
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters();
        String driver3 = globalJobParameters.getRequired("jdbc.driver");
        String url3 = globalJobParameters.getRequired("jdbc.url");
        String user3 = globalJobParameters.getRequired("jdbc.user");
        String password3 = globalJobParameters.getRequired("jdbc.password");

        //获取MySQL连接
        //-- 加载驱动
        Class.forName(driver);
        //-- 获取连接
        conn = DriverManager.getConnection(url, user, password);
        //-- 执行SQL
        //查出 有进入时间 没有出去时间,按照vin分组,找到目前最小id(电子围栏结果表的主键id)
        String sql = "select vin, min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null GROUP BY vin";
        //-- 创建预编译对象
        pstmt = conn.prepareStatement(sql);

    }

    @Override
    public void run(SourceContext ctx) throws Exception {
        while(flag) {
            HashMap<String, Integer> vehInfoMap = new HashMap<>();
            ResultSet rs = pstmt.executeQuery();
            while(rs.next()) {
                vehInfoMap.put(rs.getString("vin") , rs.getInt("id"));
            }
            if(vehInfoMap.isEmpty()){
                logger.warn("从mysql中electronic_fence相关表的数据为空");
            } else {
                ctx.collect(vehInfoMap);
                logger.info("查询电子围栏分析结果表中数据,存在记录数据为:%s 条",vehInfoMap.size());
            }
            if(!rs.isClosed()) {rs.close();}
            //多久从mysql获取一次数据
            //TimeUnit.MICROSECONDS.sleep(parameterTool.getLong("vehinfo.millionseconds"));

            //每1秒钟获取一次最新数据,因为窗口每隔90s进行一次计算,因此该时间一定要小于窗口触发计算的频率
            TimeUnit.MICROSECONDS.sleep(1);//1ms
        }

    }


    @Override
    public void cancel() {
        flag = false;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if(!pstmt.isClosed()){pstmt.close();}
        if(!conn.isClosed()) {conn.close();}
    }
}

MysqlElectricFenceSource

package pers.aishuang.flink.streaming.source.mysql;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.ElectricFenceResultTmp;


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

public class MySQLElectricFenceSource extends RichSourceFunction<HashMap<String, ElectricFenceResultTmp>> {
    private static final Logger logger = LoggerFactory.getLogger(MySQLElectricFenceSource.class.getSimpleName());
    private static Connection conn = null;
    private static Statement stmt = null;
    //设置标识用于记录当前循环读取mysql配置
    private static Boolean flag = true;
    private static String elerulesTime = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //1. 获取上下文中的 parameterTool
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig().getGlobalJobParameters();
        //2. 读取配置文件中参数,注册驱动、url、user、passworld
        String driver = globalJobParameters.getRequired("jdbc.driver");
        String url = globalJobParameters.getRequired("jdbc.url");
        String user = globalJobParameters.getRequired("jdbc.user");
        String password = globalJobParameters.getRequired("jdbc.password");
        //3. 多长时间去查一次mysql数据
        elerulesTime = globalJobParameters.getRequired("elerules.millionseconds");
        //4. 设置驱动和连接
        Class.forName(driver);
        conn = DriverManager.getConnection(url,user,password);
        stmt = conn.createStatement();
    }

    @Override
    public void run(SourceContext<HashMap<String, ElectricFenceResultTmp>> ctx) throws Exception {
        while (flag){
            HashMap<String, ElectricFenceResultTmp> map = new HashMap<>();
            //1. 查询SQL
            String sql = "select " +
                            "vins.vin,setting.id,setting.name,setting.address,setting.radius," +
                            "setting.longitude,setting.latitude,setting.start_time,setting.end_time \n" +
                        "from vehicle_networking.electronic_fence_setting setting \n" +
                        "inner join vehicle_networking.electronic_fence_vins vins on setting.id=vins.setting_id \n" +
                        "where setting.status=1";
            ResultSet rs = stmt.executeQuery(sql);
            while(rs.next()){
                map.put(
                        rs.getString("vin"),
                        new ElectricFenceResultTmp(
                                rs.getInt("id"),
                                rs.getString("name"),
                                rs.getString("address"),
                                rs.getFloat("radius"),
                                rs.getDouble("longitude"),
                                rs.getDouble("latitude"),
                                rs.getDate("start_time"),
                                rs.getDate("end_time")
                        )
                );
            }
            ctx.collect(map);
            //关闭rs
            if(!rs.isClosed()) {
                rs.close();
            }
            //收集electricFenceResult 指定休眠时间 ms
            TimeUnit.MICROSECONDS.sleep(Long.parseLong(elerulesTime));
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if(!stmt.isClosed()) stmt.close();
        if(!conn.isClosed()) conn.close();
    }
}

VehicleInfoMysqlSource

package pers.aishuang.flink.streaming.source.mysql;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.VehicleInfoModel;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import java.util.HashMap;

public class VehicleInfoMysqlSource extends RichSourceFunction<HashMap<String, VehicleInfoModel>> {
    //创建日志打印器
    private Logger logger = LoggerFactory.getLogger(VehicleInfoMysqlSource.class);

    //定义JDBC变量
    private Connection conn = null;
    private PreparedStatement pstmt = null;

    //定义获取配置文件参数工具
    ParameterTool parameterTool;

    //定义是否运行的标记
    private boolean isRunning = true; //flag

    @Override
    public void open(Configuration parameters) throws Exception {
        //通过全局变量获取配置参数
        parameterTool = (ParameterTool) getRuntimeContext()
                                    .getExecutionConfig().getGlobalJobParameters();
        //获取mysql JDBC的 driver、url、user、password
        String driver = parameterTool.getRequired("jdbc.driver");
        String url = parameterTool.getRequired("jdbc.url");
        String user = parameterTool.getRequired("jdbc.user");
        String password = parameterTool.getRequired("jdbc.password");
        //加载驱动、获取连接、创建sql字符串、获取预编译对象
        Class.forName(driver);
        conn = DriverManager.getConnection(url);
        String sql = "select t12.vin,t12.series_name,t12.model_name,t12.series_code,t12.model_code,t12.nick_name,t3.sales_date,t4.car_type\n" +
                " from (\n" +
                "select t1.vin, t1.series_name, t2.show_name as model_name, t1.series_code,t2.model_code,t2.nick_name,t1.vehicle_id\n" +
                " from vehicle_networking.dcs_vehicles t1 left join vehicle_networking.t_car_type_code t2 on t1.model_code = t2.model_code) t12\n" +
                " left join  (select vehicle_id, max(sales_date) sales_date from vehicle_networking.dcs_sales group by vehicle_id) t3\n" +
                " on t12.vehicle_id = t3.vehicle_id\n" +
                " left join\n" +
                " (select tc.vin,'net_cat' car_type from vehicle_networking.t_net_car tc\n" +
                " union all select tt.vin,'taxi' car_type from vehicle_networking.t_taxi tt\n" +
                " union all select tp.vin,'private_car' car_type from vehicle_networking.t_private_car tp\n" +
                " union all select tm.vin,'model_car' car_type from vehicle_networking.t_model_car tm) t4\n" +
                " on t12.vin = t4.vin";
        pstmt = conn.prepareStatement(sql);
    }

    @Override
    public void run(SourceContext<HashMap<String, VehicleInfoModel>> ctx) throws Exception {
        while(isRunning) {
            ResultSet resultSet = pstmt.executeQuery();
            HashMap<String, VehicleInfoModel> vehicleInfoMap = new HashMap<>();
            while(resultSet.next()) {
                VehicleInfoModel vehicleInfoModel = new VehicleInfoModel();
                //车架号
                String vin = resultSet.getString("vin");
                //车系
                String seriesName = resultSet.getString("series_name");
                //车型
                String modelName = resultSet.getString("model_name");
                //车系编码
                String seriesCode = resultSet.getString("series_code");
                //车型编码
                String modelCode = resultSet.getString("model_code");
                //车辆类型简称
                String nickName = resultSet.getString("nick_name");
                //出售日期
                String salesDate = resultSet.getString("sales_date");
                //车辆用途
                String carType = resultSet.getString("car_type");

                //年限
                String liveTime = "-1";
                if (salesDate != null) {
                    //当前日期-售出日期=使用年限
                    liveTime = String.valueOf((new Date().getTime() - resultSet.getDate("sales_date").getTime()) / 1000 / 3600 / 24 / 365);
                }
                if (null == vin) {
                    vin = "未知";
                }
                if (null == seriesName) {
                    seriesName = "未知";
                }
                if (null == modelName) {
                    modelName = "未知";
                }
                if (null == seriesCode) {
                    seriesCode = "未知";
                }
                if (null == modelCode) {
                    modelCode = "未知";
                }
                if (null == nickName) {
                    nickName = "未知";
                }
                if (null == salesDate) {
                    salesDate = "未知";
                }
                if (null == carType) {
                    carType = "未知";
                }

                vehicleInfoModel.setSeriesName(seriesName);
                vehicleInfoModel.setSeriesCode(seriesCode);
                vehicleInfoModel.setModelName(modelName);
                vehicleInfoModel.setModelCode(modelCode);
                vehicleInfoModel.setLiveTime(liveTime);
                vehicleInfoModel.setNickName(nickName);
                vehicleInfoModel.setCarType(carType);
                vehicleInfoModel.setSalesDate(salesDate);

                //将车辆基础数据封装到集合返回
                vehicleInfoMap.put(vin, vehicleInfoModel);
            }
            if(vehicleInfoMap.isEmpty()) {
                logger.warn("从车辆基础数据表中查询数据为空....");
            }else{
                ctx.collect(vehicleInfoMap);
            }
            resultSet.close();
            //设置多久从mysql查询一次数据(及规则变更周期时间)
            Thread.sleep(parameterTool.getInt("vehinfo.millionseconds"));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    /**
     * 释放资源
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
        if(pstmt != null) pstmt.close();
        if(conn != null) conn.close();
    }
}
上一篇:一文秒懂串口、COM口、TTL、RS-232、RS-485区别


下一篇:搞懂钩子方法和模板方法,看完这篇就够了