案例准备:
1、启动MySQL,在mysql中创建数据库flinkdb,并创建表sensor_temp
CREATE TABLE sensor_temp (
id varchar(32),
temp double
)
代码实现:
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[SensorReading] = env.addSource(new MyDefSource)
dataStream.addSink(new MyJdbcSinkFunction())
env.execute()
}
class MyJdbcSinkFunction extends RichSinkFunction[SensorReading]{
var connection: Connection =_
var insertStmt: PreparedStatement=_
var updateStmt: PreparedStatement=_
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
updateStmt.setDouble(1,value.temperature)
updateStmt.setString(2,value.id)
updateStmt.execute()
if(updateStmt.getUpdateCount == 0){
insertStmt.setString(1,value.id)
insertStmt.setDouble(2,value.temperature)
insertStmt.execute()
}
}
override def open(parameters: Configuration): Unit = {
connection = DriverManager.getConnection("jdbc:mysql://192.168.91.180:3306/flinkdb?useSSL=false", "root", "123123")
insertStmt = connection.prepareStatement("insert into sensor_temp(id,temp) value(?,?)")
updateStmt = connection.prepareStatement("update sensor_temp set temp=? where id=?")
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
connection.close()
}
运行结果:
查询数据select * from sensor_temp;