MysqlEletricFenceResultSource
package pers.aishuang.flink.streaming.source.mysql;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 读取Mysql中电子围栏相关表结合成后的规则
*/
public class MysqlElectricFenceResultSource extends RichSourceFunction {
//新建日志打印器
private static final Logger logger = LoggerFactory.getLogger(MysqlElectricFenceResultSource.class);
//定义JDBC变量
private static Connection conn = null;
private static PreparedStatement pstmt = null;
//设置标识用于记录当前循环读取mysql配置
private static Boolean flag = true;
//定义获取配置文件参数工具
private static ParameterTool parameterTool = null;
private static Map<String, String> parasMap = null;
private static ParameterTool globalJobParameters = null;
@Override
public void open(Configuration parameters) throws Exception {
//方式一:通过ParameterTool自己再获取配置文件参数
parameterTool = ParameterTool.fromPropertiesFile(MysqlElectricFenceResultSource.class
.getClassLoader()
.getResourceAsStream("conf.properties"));
//-- 获取Driver、url、user、password
String driver = parameterTool.getRequired("jdbc.driver");
String url = parameterTool.getRequired("jdbc.url");
String user = parameterTool.getRequired("jdbc.user");
String password = parameterTool.getRequired("jdbc.password");
//方式二:通过执行环境设置的全局任务参数里获取 参数
parasMap = getRuntimeContext()
.getExecutionConfig()
.getGlobalJobParameters()
.toMap();
String driver2 = parasMap.get("jdbc.driver");
String url2 = parasMap.get("jdbc.url");
String user2 = parasMap.get("jdbc.user");
String password2 = parasMap.get("jdbc.password");
//方式三:与方式二本质上一样
ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
.getExecutionConfig()
.getGlobalJobParameters();
String driver3 = globalJobParameters.getRequired("jdbc.driver");
String url3 = globalJobParameters.getRequired("jdbc.url");
String user3 = globalJobParameters.getRequired("jdbc.user");
String password3 = globalJobParameters.getRequired("jdbc.password");
//获取MySQL连接
//-- 加载驱动
Class.forName(driver);
//-- 获取连接
conn = DriverManager.getConnection(url, user, password);
//-- 执行SQL
//查出 有进入时间 没有出去时间,按照vin分组,找到目前最小id(电子围栏结果表的主键id)
String sql = "select vin, min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null GROUP BY vin";
//-- 创建预编译对象
pstmt = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext ctx) throws Exception {
while(flag) {
HashMap<String, Integer> vehInfoMap = new HashMap<>();
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
vehInfoMap.put(rs.getString("vin") , rs.getInt("id"));
}
if(vehInfoMap.isEmpty()){
logger.warn("从mysql中electronic_fence相关表的数据为空");
} else {
ctx.collect(vehInfoMap);
logger.info("查询电子围栏分析结果表中数据,存在记录数据为:%s 条",vehInfoMap.size());
}
if(!rs.isClosed()) {rs.close();}
//多久从mysql获取一次数据
//TimeUnit.MICROSECONDS.sleep(parameterTool.getLong("vehinfo.millionseconds"));
//每1秒钟获取一次最新数据,因为窗口每隔90s进行一次计算,因此该时间一定要小于窗口触发计算的频率
TimeUnit.MICROSECONDS.sleep(1);//1ms
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
super.close();
if(!pstmt.isClosed()){pstmt.close();}
if(!conn.isClosed()) {conn.close();}
}
}
MysqlElectricFenceSource
package pers.aishuang.flink.streaming.source.mysql;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.ElectricFenceResultTmp;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
public class MySQLElectricFenceSource extends RichSourceFunction<HashMap<String, ElectricFenceResultTmp>> {
private static final Logger logger = LoggerFactory.getLogger(MySQLElectricFenceSource.class.getSimpleName());
private static Connection conn = null;
private static Statement stmt = null;
//设置标识用于记录当前循环读取mysql配置
private static Boolean flag = true;
private static String elerulesTime = null;
@Override
public void open(Configuration parameters) throws Exception {
//1. 获取上下文中的 parameterTool
ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
//2. 读取配置文件中参数,注册驱动、url、user、passworld
String driver = globalJobParameters.getRequired("jdbc.driver");
String url = globalJobParameters.getRequired("jdbc.url");
String user = globalJobParameters.getRequired("jdbc.user");
String password = globalJobParameters.getRequired("jdbc.password");
//3. 多长时间去查一次mysql数据
elerulesTime = globalJobParameters.getRequired("elerules.millionseconds");
//4. 设置驱动和连接
Class.forName(driver);
conn = DriverManager.getConnection(url,user,password);
stmt = conn.createStatement();
}
@Override
public void run(SourceContext<HashMap<String, ElectricFenceResultTmp>> ctx) throws Exception {
while (flag){
HashMap<String, ElectricFenceResultTmp> map = new HashMap<>();
//1. 查询SQL
String sql = "select " +
"vins.vin,setting.id,setting.name,setting.address,setting.radius," +
"setting.longitude,setting.latitude,setting.start_time,setting.end_time \n" +
"from vehicle_networking.electronic_fence_setting setting \n" +
"inner join vehicle_networking.electronic_fence_vins vins on setting.id=vins.setting_id \n" +
"where setting.status=1";
ResultSet rs = stmt.executeQuery(sql);
while(rs.next()){
map.put(
rs.getString("vin"),
new ElectricFenceResultTmp(
rs.getInt("id"),
rs.getString("name"),
rs.getString("address"),
rs.getFloat("radius"),
rs.getDouble("longitude"),
rs.getDouble("latitude"),
rs.getDate("start_time"),
rs.getDate("end_time")
)
);
}
ctx.collect(map);
//关闭rs
if(!rs.isClosed()) {
rs.close();
}
//收集electricFenceResult 指定休眠时间 ms
TimeUnit.MICROSECONDS.sleep(Long.parseLong(elerulesTime));
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
super.close();
if(!stmt.isClosed()) stmt.close();
if(!conn.isClosed()) conn.close();
}
}
VehicleInfoMysqlSource
package pers.aishuang.flink.streaming.source.mysql;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.VehicleInfoModel;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import java.util.HashMap;
public class VehicleInfoMysqlSource extends RichSourceFunction<HashMap<String, VehicleInfoModel>> {
//创建日志打印器
private Logger logger = LoggerFactory.getLogger(VehicleInfoMysqlSource.class);
//定义JDBC变量
private Connection conn = null;
private PreparedStatement pstmt = null;
//定义获取配置文件参数工具
ParameterTool parameterTool;
//定义是否运行的标记
private boolean isRunning = true; //flag
@Override
public void open(Configuration parameters) throws Exception {
//通过全局变量获取配置参数
parameterTool = (ParameterTool) getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
//获取mysql JDBC的 driver、url、user、password
String driver = parameterTool.getRequired("jdbc.driver");
String url = parameterTool.getRequired("jdbc.url");
String user = parameterTool.getRequired("jdbc.user");
String password = parameterTool.getRequired("jdbc.password");
//加载驱动、获取连接、创建sql字符串、获取预编译对象
Class.forName(driver);
conn = DriverManager.getConnection(url);
String sql = "select t12.vin,t12.series_name,t12.model_name,t12.series_code,t12.model_code,t12.nick_name,t3.sales_date,t4.car_type\n" +
" from (\n" +
"select t1.vin, t1.series_name, t2.show_name as model_name, t1.series_code,t2.model_code,t2.nick_name,t1.vehicle_id\n" +
" from vehicle_networking.dcs_vehicles t1 left join vehicle_networking.t_car_type_code t2 on t1.model_code = t2.model_code) t12\n" +
" left join (select vehicle_id, max(sales_date) sales_date from vehicle_networking.dcs_sales group by vehicle_id) t3\n" +
" on t12.vehicle_id = t3.vehicle_id\n" +
" left join\n" +
" (select tc.vin,'net_cat' car_type from vehicle_networking.t_net_car tc\n" +
" union all select tt.vin,'taxi' car_type from vehicle_networking.t_taxi tt\n" +
" union all select tp.vin,'private_car' car_type from vehicle_networking.t_private_car tp\n" +
" union all select tm.vin,'model_car' car_type from vehicle_networking.t_model_car tm) t4\n" +
" on t12.vin = t4.vin";
pstmt = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext<HashMap<String, VehicleInfoModel>> ctx) throws Exception {
while(isRunning) {
ResultSet resultSet = pstmt.executeQuery();
HashMap<String, VehicleInfoModel> vehicleInfoMap = new HashMap<>();
while(resultSet.next()) {
VehicleInfoModel vehicleInfoModel = new VehicleInfoModel();
//车架号
String vin = resultSet.getString("vin");
//车系
String seriesName = resultSet.getString("series_name");
//车型
String modelName = resultSet.getString("model_name");
//车系编码
String seriesCode = resultSet.getString("series_code");
//车型编码
String modelCode = resultSet.getString("model_code");
//车辆类型简称
String nickName = resultSet.getString("nick_name");
//出售日期
String salesDate = resultSet.getString("sales_date");
//车辆用途
String carType = resultSet.getString("car_type");
//年限
String liveTime = "-1";
if (salesDate != null) {
//当前日期-售出日期=使用年限
liveTime = String.valueOf((new Date().getTime() - resultSet.getDate("sales_date").getTime()) / 1000 / 3600 / 24 / 365);
}
if (null == vin) {
vin = "未知";
}
if (null == seriesName) {
seriesName = "未知";
}
if (null == modelName) {
modelName = "未知";
}
if (null == seriesCode) {
seriesCode = "未知";
}
if (null == modelCode) {
modelCode = "未知";
}
if (null == nickName) {
nickName = "未知";
}
if (null == salesDate) {
salesDate = "未知";
}
if (null == carType) {
carType = "未知";
}
vehicleInfoModel.setSeriesName(seriesName);
vehicleInfoModel.setSeriesCode(seriesCode);
vehicleInfoModel.setModelName(modelName);
vehicleInfoModel.setModelCode(modelCode);
vehicleInfoModel.setLiveTime(liveTime);
vehicleInfoModel.setNickName(nickName);
vehicleInfoModel.setCarType(carType);
vehicleInfoModel.setSalesDate(salesDate);
//将车辆基础数据封装到集合返回
vehicleInfoMap.put(vin, vehicleInfoModel);
}
if(vehicleInfoMap.isEmpty()) {
logger.warn("从车辆基础数据表中查询数据为空....");
}else{
ctx.collect(vehicleInfoMap);
}
resultSet.close();
//设置多久从mysql查询一次数据(及规则变更周期时间)
Thread.sleep(parameterTool.getInt("vehinfo.millionseconds"));
}
}
@Override
public void cancel() {
isRunning = false;
}
/**
* 释放资源
* @throws Exception
*/
@Override
public void close() throws Exception {
super.close();
if(pstmt != null) pstmt.close();
if(conn != null) conn.close();
}
}