本文的基础环境可以参考flink 1.10.1 java版本wordcount演示 (nc + socket),在此基础上实现通过jdbc从mysql读取数据。
1. 添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
</dependency>
由于测试环境是mysql8,所以这里选择的是mysql8.0.18版本的驱动。
2. 实体类
package com.demo.source;
public class WordCount {
private Long id;
private String wordName;
private Long wordcount;
public WordCount() {
}
public WordCount(Long id, String wordName, Long wordcount) {
this.id = id;
this.wordName = wordName;
this.wordcount = wordcount;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getWordName() {
return wordName;
}
public void setWordName(String wordName) {
this.wordName = wordName;
}
public Long getWordcount() {
return wordcount;
}
public void setWordcount(Long wordcount) {
this.wordcount = wordcount;
}
@Override
public String toString() {
return "WordCount{" +
"id=" + id +
", wordName='" + wordName + '\'' +
", wordcount=" + wordcount +
'}';
}
}
实体类定义了一个无惨构造和一个全参数构造函数,重写了toString方法。
3. 定义mysql source
package com.demo.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
*
*/
public class MySqlSource extends RichSourceFunction<WordCount> {
private PreparedStatement ps=null;
private Connection connection=null;
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8";
String username = "username";
String password = "password";
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "select id, word_name, word_count from tbl_flink_wordcount;";
//获取执行语句
ps = connection.prepareStatement(sql);
}
@Override
public void run(SourceContext<WordCount> sourceContext) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
WordCount student = new WordCount(
resultSet.getLong("id"),
resultSet.getString("word_name").trim(),
resultSet.getLong("word_count"));
sourceContext.collect(student);//发送结果
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
@Override
public void close() throws Exception {
super.close();
if(connection != null){
connection.close();
}
if (ps != null){
ps.close();
}
}
//获取mysql连接配置
public Connection getConnection(){
try {
// 加载驱动
// Class.forName(driver);
//创建连接
connection = DriverManager.getConnection(url,username,password);
} catch (Exception e) {
System.out.println("********mysql get connection occur exception, msg = "+e.getMessage());
e.printStackTrace();
}
return connection;
}
}
采用MySQL8驱动连接mysql8时,需要指定时区。
如果数据量大,还可以考虑分页读取等方式进行处理。
由于数据读取过程和数据处理过程是同时进行的,所以读取过程可以持续读取,也可以延时读取,一直处于读取过程中,也不会影响数据处理过程。
4. 测试代码
package com.demo.source;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
*
*/
public class FlinkMySqlSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WordCount> dataStream = env.addSource(new MySqlSource());
// 指定并行度为1
dataStream.print().setParallelism(1);
env.execute("Flink Mysql Source");
}
}
5. 启动程序执行测试
可以看到从mysql读取的数据。
6. 参考表定义
CREATE TABLE `tbl_flink_wordcount` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word_name` varchar(64) DEFAULT NULL,
`word_count` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;