package com.ysservice.dataStreamApi.sink;
import com.ysservice.dataStreamApi.utils.GreenplumUtil;
import com.ysservice.dataStreamApi.utils.RegexUtils;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* @Description: GreenPlum Sink
* @author: WuBo
* @date:2021/11/8 14:09
*/
public class GreenPlumSink extends RichSinkFunction<List<Tuple3<Boolean, String, Row>>> {
Connection connection;
Statement statement;
//任务开始时执行一次,用于获取Greenplum的链接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = GreenplumUtil.getConnection();
statement = connection.createStatement();
}
// 每条记录插入时调用一次
public void invoke(List<Tuple3<Boolean, String, Row>> value, SinkFunction.Context context) throws Exception {
if (value.size() > 0) {
String tableName = value.get(0).f1;
String insertSql = "insert into "+tableName+" values";
ArrayList<Tuple3<Boolean, String, Row>> errorTuple3List = new ArrayList<>();
for (Tuple3<Boolean, String, Row> tuple3 : value) {
if (tuple3.f0) {
errorTuple3List.add(tuple3);
insertSql += (insertValue(tuple3.f2.getFieldNames(true), tuple3));
} else {
if (!(("insert into "+tableName+" values").equals(insertSql))) {
try {
statement.execute(insertSql.substring(0, (insertSql.length() - 1)));
} catch (SQLException throwables) {
executeErrorSql(errorTuple3List);
errorTuple3List.clear();
}
insertSql = "insert into "+tableName+" values";
}
String deleteSql = deleteSql(tuple3.f2.getFieldNames(true), tuple3);
try {
statement.execute(deleteSql);
} catch (SQLException throwables) {
statement.execute("insert into cdc_log.cdc_error_sql(error_sql) values ('" + deleteSql + "')");
}
}
}
if (!(("insert into "+tableName+" values").equals(insertSql))) {
try {
statement.execute(insertSql.substring(0, (insertSql.length() - 1)));
errorTuple3List.clear();
} catch (SQLException throwables) {
executeErrorSql(errorTuple3List);
errorTuple3List.clear();
}
}
}
/* Set<String> names = value.f2.getFieldNames(true);//获得所有的自动名
if (value.f0) {//如果value.f0=true,说明是插入数据,如果=false,说明是删除数据
String insertSql1 = insertSql(names, value);
try {
statement.execute(insertSql1);
} catch (SQLException throwables) { //如果上面的插入语句报错,那么可能是主键存在导致的,所以执行下面的先删除,再插入
String deleteSql1 = deleteSql(names, value);
try {
statement.execute(deleteSql1); //如果执行删除报错,就将错误的sql写入错误日志表
} catch (SQLException e) {
statement.execute("insert into cdc_log.cdc_error_sql(error_sql) values ('"+deleteSql1+"')");
}
String insertSql2 = insertSql(names, value);
try {
statement.execute(insertSql2); //如果执行插入报错,就将错误的sql写入错误日志表
} catch (SQLException e) {
statement.execute("insert into cdc_log.cdc_error_sql(error_sql) values ('"+insertSql2+"')");
}
}
} else {
String deleteSql2 = deleteSql(names, value);
try {
statement.execute(deleteSql2); 如果执行删除报错,就将错误的sql写入错误日志表
} catch (SQLException throwables) {
statement.execute("insert into cdc_log.cdc_error_sql(error_sql) values ('" + deleteSql2 + "')");
}
}*/
}
public void executeErrorSql(ArrayList<Tuple3<Boolean, String, Row>> errorTuple3List) throws Exception {
if (errorTuple3List.size() > 0) {
for (Tuple3<Boolean, String, Row> tuple3 : errorTuple3List) {
String deleteSql = deleteSql(tuple3.f2.getFieldNames(true), tuple3);
try {
statement.execute(deleteSql); //如果执行删除报错,就将错误的sql写入错误日志表
} catch (SQLException e) {
statement.execute("insert into cdc_log.cdc_error_sql(error_sql) values ('" + deleteSql + "')");
}
String insertSql = insertSql(tuple3.f2.getFieldNames(true), tuple3);
try {
statement.execute(insertSql); //如果执行插入报错,就将错误的sql写入错误日志表
} catch (SQLException e) {
statement.execute("insert into cdc_log.cdc_error_sql(error_sql) values ('" + insertSql + "')");
}
}
}
}
//sql插入语句
public String insertValue(Set<String> names, Tuple3<Boolean, String, Row> value) {
String values = "";//用于拼接数据
for (String name : names) {//遍历字段,获取字段对于的数据后,拼接字段和数据,最终拼成一个完整的sql
Object data = value.f2.getField(name);//获得字段对应的数据
if (data != null) {
if (RegexUtils.numberRegex(data.toString())) {//判断数据类型是否是数字的
data = data.toString();
} else {
data = "'" + RegexUtils.timeStampRegex(data.toString()).replace("'", "''") + "'";//拼sql时如果遇到单引号会报错,将一个单引号变成两个就ok了
}
} else {
data = "null";
}
values += data + ",";
}
//拼接插入的sql
return "(" + values.substring(0, (values.length() - 1)) + "),";
}
//sql插入语句
public String insertSql(Set<String> names, Tuple3<Boolean, String, Row> value) {
String datas = "";//用于拼接数据
String fields = "";//用于拼接字段
for (String name : names) {//遍历字段,获取字段对于的数据后,拼接字段和数据,最终拼成一个完整的sql
Object data = value.f2.getField(name);//获得字段对应的数据
if (data != null) {
if (RegexUtils.numberRegex(data.toString())) {//判断数据类型是否是数字的
data = data.toString();
} else {
data = "'" + RegexUtils.timeStampRegex(data.toString()).replace("'", "''") + "'";//拼sql时如果遇到单引号会报错,将一个单引号变成两个就ok了
}
datas += data + ",";
fields += name + ",";
}
}
//拼接插入的sql
String sql = "insert into " + value.f1 + "(" + fields.substring(0, (fields.length() - 1)) + ") values(" + datas.substring(0, (datas.length() - 1)) + ");";
return sql;
}
//sql删除语句
public String deleteSql(Set<String> names, Tuple3<Boolean, String, Row> value) throws Exception {
//获得每张表的主键字段,使用主键去删除数据
Class<?> sourceClass = Class.forName("com.ysservice.dataStreamApi.source." + value.f1);
Field primaryKey = sourceClass.getDeclaredField("primaryKey");
String keys = (String) primaryKey.get(sourceClass);
String[] keyArr = keys.split(",");
String wheres = "";//用于拼接where sql
for (String name : keyArr) {
Object data = value.f2.getField(name);
if (data != null) {
if (RegexUtils.numberRegex(data.toString())) {
data = data.toString();
} else {
data = "'" + RegexUtils.timeStampRegex(data.toString()).replace("'", "''") + "'";
}
wheres += name + "=" + data + " and ";
}
}
//拼接删除的sql
String sql = "delete from " + value.f1 + " where " + wheres.substring(0, (wheres.length() - 5)) + ";";
return sql;
}
@Override
public void close() throws Exception {
super.close();
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
}
}