desktop-api --config application.properties application-prod.properties --desktop-api.jar --BOOT-INF --classes application.properties application-prod.properties
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
private static final VarHandle STATE;
private static final VarHandle RUNNER;
private static final VarHandle WAITERS;
}
package org.flink.sink; import org.flink.beans.SensorReading; import org.example.SourceTest4_UDF; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * @author 只是甲 * @date 2021-09-15 * @remark Sink之JDBC */ public class SinkTest4_Jdbc { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据 // DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"); // // // 转换成SensorReading类型 // DataStream<SensorReading> dataStream = inputStream.map(line -> { // String[] fields = line.split(","); // return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); // }); DataStream<SensorReading> dataStream = env.addSource(new SourceTest4_UDF.MySensorSource()); dataStream.addSink(new MyJdbcSink()); env.execute(); } // 实现自定义的SinkFunction public static class MyJdbcSink extends RichSinkFunction<SensorReading> { // 声明连接和预编译语句 Connection connection = null; PreparedStatement insertStmt = null; PreparedStatement updateStmt = null; @Override public void open(Configuration parameters) throws Exception { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456"); insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)"); updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?"); } // 每来一条数据,调用连接,执行sql @Override public void invoke(SensorReading value, Context context) throws Exception { // 直接执行更新语句,如果没有更新那么就插入 updateStmt.setDouble(1, value.getTemperature()); updateStmt.setString(2, value.getId()); updateStmt.execute(); if( updateStmt.getUpdateCount() == 0 ){ insertStmt.setString(1, value.getId()); insertStmt.setDouble(2, value.getTemperature()); insertStmt.execute(); } } @Override public void close() throws Exception { insertStmt.close(); updateStmt.close(); connection.close(); } } }