Flink 流处理 API
Flink 流处理 API 的调用过程主要分为以下几个步骤:
所以 Flink API 也是围绕以下几个环节进行介绍。
1. Environment
Environment,即 Flink 程序的执行环境。
getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。
如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。即 getExecutionEnvironment
会根据查询执行的方式返回什么样的运行环境,最常用的一种创建执行环境的方式。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
如果没有设置并行度,会以flink-conf.yaml
中的配置为准,默认为 1。
可以通过 setParallelism()
方法更改并行度。
env.setParallelism(2);
createLocalEnvironment
返回本地执行环境,可以在调用时指定默认的并行度。
LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
// 指定并行度为 2
LocalStreamEnvironment localEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment(2);
createRemoteEnvironment
返回集群执行环境,将 Jar 提交到远程服务器。
需要在调用时指定JobManager 的 IP 和 Port,并指定要在集群中运行到 Jar 包。
StreamExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081, "WordCount.jar");
2. Source
Source,即数据源,通过数据源读入数据至 Flink 程序中执行,Source 支持多种方式读取数据。
存在如下一个类,用于记录用户日志。
import lombok.Data;
@Data
public class UserLog {
private String id;
private Long timeStamp;
private String log;
public UserLog() {
}
public UserLog(String id, Long timeStamp, String log) {
this.id = id;
this.timeStamp = timeStamp;
this.log = log;
}
}
从集合读取数据
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<UserLog> userLogs = Arrays.asList(
new UserLog("user_1", 1547718199L, "aaa"),
new UserLog("user_2", 1547718199L, "bbb"),
new UserLog("user_3", 1547718199L, "ccc")
);
// 从集合中读取数据
DataStream<UserLog> dataStream = env.fromCollection(userLogs);
// 打印输出
dataStream.print("userLogs");
// 执行
env.execute();
}
从文件读取数据
# userLog.txt
user_1, 1547718212, aaa
user_1, 1547718213, bbb
user_1, 1547718214, ccc
user_2, 1547718212, ddd
user_2, 1547718213, eee
user_3, 1547718214, fff
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据
DataStream<String> dataStream = env.readTextFile("userLog.txt");
// 打印输出
dataStream.print();
// 执行
env.execute();
}
从 kafka 读取数据
需要引入 kafka 连接器到依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
// 从文件中读取数据
DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>(
"user_log", new SimpleStringSchema(), properties));
// 打印输出
dataStream.print();
// 执行
env.execute();
}
自定义 Source
除了以上的source
数据来源,我们还可以自定义source
。只需要传入一个SourceFunction
就可以。具体调用如下:
DataStream<UserLog> dataStream = env.addSource(new SourceFunction<UserLog>() {
@Override
public void run(SourceContext<UserLog> ctx) throws Exception {
}
@Override
public void cancel() {
}
});
我们通过重写run
方法控制数据的生成,通过cancel
方法控制不再读入数据。
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从自定义读取数据
final UserLogSource userLogSource = new UserLogSource();
DataStream<UserLog> dataStream = env.addSource(userLogSource);
// 打印输出
dataStream.print();
// 执行
env.execute();
}
// 实现自定义的 SourceFunction
public static class UserLogSource implements SourceFunction<UserLog> {
// 定义标识位,用来控制数据的产生
private volatile boolean running = true;
@Override
public void run(SourceContext<UserLog> ctx) throws Exception {
int num = 0;
while (running) {
UserLog userLog = new UserLog();
userLog.setId(num++ + "");
userLog.setTimeStamp(System.currentTimeMillis());
userLog.setLog("log:" + num);
ctx.collect(userLog);
}
}
@Override
public void cancel() {
running = false;
}
}
可以看到,自定义source
的方式非常灵活,可以实现任意形式的数据读入。如从 kafka、文件、mysql、redis、es 等都可以通过自定义source
等的方式进行读入。
3. Transform
在将数据读入后,我们可以需要进行大量的转换操作,而转换操作就是通过如下 API 实现的。
map
将数据元素转换为另一种格式的数据元素。
// 将元素由 string 转换为对应 string 的长度
DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return value.length();
}
});
test.txt
a
bb
ccc
dddd
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中读取数据
DataStream<String> inputStream =
env.readTextFile("test.txt");
// map 把 String 转换成长度输出
DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return value.length();
}
});
// 打印输出
mapStream.print("map");
env.execute();
}
flatMap
map
只支持一对一的转换,如果需要进行一对多转换,则需要通过flatMap
实现。
test.txt
a,1
bb,2
ccc,3
dddd,4
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中读取数据
DataStream<String> inputStream = env.readTextFile("test.txt");
// flatMap,按逗号分字段
DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> collector) {
String[] fields = value.split(",");
for (String field : fields) {
collector.collect(field);
}
}
});
// 打印输出
flatMapStream.print("flatMap");
env.execute();
}
Fliter
过滤掉数据流中的元素。
test.txt
a1
b1
c1
d1
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中读取数据
DataStream<String> inputStream =env.readTextFile("test.txt");
// filter,筛选 a 开头的数据
DataStream<String> filterStream = inputStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) {
return value.startsWith("a");
}
});
// 打印输出
filterStream.print("filter");
env.execute();
}
keyBy
逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同的key
的元素,在内部以hash
的形式实现。
调用keyBy
,数据类型由DataStream
转换为KeyedStream
。
滚动聚合算子
这些算子可以针对KeyedStream
的每一个支流做聚合。
- sum()
- min()
- max()
- minBy()
- maxBy()
test.txt
user_1,100000,log_1
user_1,300000,log_3
user_1,200000,log_2
user_2,200000,log_5
user_2,100000,log_4
user_3,300000,log_6
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中读取数据
DataStream<String> inputStream = env.readTextFile("test.txt");
// 转换成UserLog类型
DataStream<UserLog> dataStream = inputStream.map(new MapFunction<String, UserLog>() {
public UserLog map(String value) throws Exception {
String[] array = value.split(",");
return new UserLog(array[0], Long.parseLong(array[1]), array[2]);
}
});
// 分组
KeyedStream<UserLog, Tuple> keyedStream = dataStream.keyBy("id");
// 滚动聚合,取当前最大的时间戳
DataStream<UserLog> resultStream = keyedStream.max("timeStamp");
resultStream.print();
env.execute();
}
通过输出可以看到,随着数据流的到来,成功获得最大的数据。
Reduce
一个分组数据流的聚合操作,合并当前元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
有如下一个需求,我们不仅要知道各科目最大的成绩,还要知道最大成绩的所属学生。如果只通过 max
的话,是无法满足我们的需求的,此时,就需要通过 reduce
来实现该需求。
@Data
public class Score {
private String course;
private String student;
private int score;
public Score() {
}
public Score(String course, String student, int score) {
this.course = course;
this.student = student;
this.score = score;
}
}
test.txt
语文,小明,80
语文,小王,70
语文,小张,90
语文,小李,100
数学,小明,100
数学,小王,90
数学,小张,80
数学,小李,70
英语,小明,70
英语,小王,80
英语,小张,90
英语,小李,100
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中读取数据
DataStream<String> inputStream = env.readTextFile("test.txt");
// 转换成Score类型
DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
@Override
public Score map(String value) throws Exception {
String[] array = value.split(",");
return new Score(array[0], array[1], Integer.parseInt(array[2]));
}
});
// 分组
KeyedStream<Score, Tuple> keyedStream = dataStream.keyBy("course");
// reduce 聚合,取最大成绩
DataStream<Score> reduceStream = keyedStream.reduce(new ReduceFunction<Score>() {
@Override
public Score reduce(Score value1, Score value2) throws Exception {
String student = value1.getStudent();
if (value1.getScore() < value2.getScore()) {
student = value2.getStudent();
}
return new Score(value1.getCourse(), student, Math.max(value1.getScore(), value2.getScore()));
}
});
reduceStream.print();
env.execute();
}
split 和 select
split
:根据某些特征把一个DataStream
拆分成两个或者多个DataStream
。
select
:从一个 SplitStream
中获取一个或者多个DataStream
。
test.txt
语文,小明,80
语文,小王,30
语文,小张,90
语文,小李,100
数学,小明,100
数学,小王,40
数学,小张,80
数学,小李,70
英语,小明,70
英语,小王,80
英语,小张,50
英语,小李,100
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中读取数据
DataStream<String> inputStream = env.readTextFile("test.txt");
// 转换成SensorReading类型
DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
@Override
public Score map(String value) throws Exception {
String[] array = value.split(",");
return new Score(array[0], array[1], Integer.parseInt(array[2]));
}
});
// 分流,按照分数60分为界,分成两条流
SplitStream<Score> splitStream = dataStream.split(new OutputSelector<Score>() {
@Override
public Iterable<String> select(Score score) {
return score.getScore() > 60 ?
Collections.singletonList("pass") : Collections.singletonList("unPass");
}
});
DataStream<Score> passStream = splitStream.select("pass");
DataStream<Score> unPassStream = splitStream.select("unPass");
passStream.print("pass");
unPassStream.print("unPass");
env.execute();
}
connect 和 coMap、coFlatMap
connect
:连接两个保持他们类型的数据流,两个数据流被 connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
coMap
、coFlatMap
:作用于ConnectedStreams
上,功能与map
和flatMap
一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中读取数据
DataStream<String> inputStream = env.readTextFile("test.txt");
// 转换成SensorReading类型
DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
@Override
public Score map(String value) throws Exception {
String[] array = value.split(",");
return new Score(array[0], array[1], Integer.parseInt(array[2]));
}
});
// 1、分流,按照分数60分为界,分成两条流
SplitStream<Score> splitStream = dataStream.split(new OutputSelector<Score>() {
@Override
public Iterable<String> select(Score score) {
return score.getScore() > 60 ?
Collections.singletonList("pass") : Collections.singletonList("unPass");
}
});
DataStream<Score> passStream = splitStream.select("pass");
DataStream<Score> unPassStream = splitStream.select("unPass");
// 2、合流,将及格对数据流转换成二元组类型
DataStream<Tuple2<String, Integer>> warningStream = passStream.map(new MapFunction<Score, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Score score) throws Exception {
return new Tuple2<String, Integer>(score.getCourse(), score.getScore());
}
});
// 3、与不及格对数据流连接合并之后,输出状态信息
ConnectedStreams<Tuple2<String, Integer>, Score> connectedStream = warningStream.connect(unPassStream);
DataStream<Object> resultStream = connectedStream.map(new CoMapFunction<Tuple2<String, Integer>, Score, Object>() {
@Override
public Object map1(Tuple2<String, Integer> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "pass");
}
@Override
public Object map2(Score value) throws Exception {
return new Tuple3<>(value.getCourse(), value.getScore(), "unpass");
}
});
resultStream.print("resultStream");
}
union
union
:对两个或者两个以上的DataStream
进行union
操作,产生一个包含所有DataStream
元素的新 DataStream
。
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中读取数据
DataStream<String> inputStream = env.readTextFile("text.txt");
// 转换成SensorReading类型
DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
@Override
public Score map(String value) throws Exception {
String[] array = value.split(",");
return new Score(array[0], array[1], Integer.parseInt(array[2]));
}
});
// 1、分流,按照分数60分为界,分成两条流
SplitStream<Score> splitStream = dataStream.split(new OutputSelector<Score>() {
@Override
public Iterable<String> select(Score score) {
return score.getScore() > 60 ?
Collections.singletonList("pass") : Collections.singletonList("unPass");
}
});
DataStream<Score> passStream = splitStream.select("pass");
DataStream<Score> unPassStream = splitStream.select("unPass");
// 2、union 联合多条流
DataStream<Score> allScoreStream = passStream.union(unPassStream);
allScoreStream.print("allScoreStream");
env.execute();
}
支持的数据类型
Flink
流应用程序处理的是以数据对象表示的事件流。在Flink
内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送他们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。Flink 使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink 还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda
函数或范型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink 支持 Java 和 Scala 中常见的数据类型,使用最广泛的类型有以下几种。
- 基础数据类型
Flink
支持所有的Java
和Scala
基础数据类型,Int、Double、Long、String。
- Java 和 Scala 元组(
Tuples
) - Scala 样例类(case classes)
- Java 简单对象(POJOs)
- 其他(Arrays、List、Maps、Enums)等
UDF 函数(更细粒度的控制流)
函数类(Function Classes)
Flink 暴露了所有 udf 函数的接口(实现方式为接口或抽象类)。例如MapFunction
,FilterFunction
,ProcessFunction
等等。
下面例子实现了FilterFunction
接口:
DataStream<String> filterStream = dataStream.filter(new FlinkFilter());
public static class FlinkFilter implements FilterFunction<String> {
@Override
public boolean filter(String value) throws Exception {
return value.contains("flink");
}
}
也可以将函数实现为匿名类:
DataStream<String> filterStream = dataStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.contains("flink");
}
});
匿名函数(Lambda Functions)
DataStream<String> filterStream = dataStream.filter(value -> value.contains("flink"));
富函数(Rich Functions)
“富函数”是DataStream API
提供的一个函数类的接口,所有 Flink 函数类都有其Rich
版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- …
Rich Function
有一个生命周期的概念。典型的生命周期方法有:
-
open()
方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前open
会被调用。 -
close()
方法是生命周期中的最后一个调用的方法,做一些清理工作。 -
getRuntimeContext()
方法提供了函数的RuntimeContext
的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态。
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// 从文件中读取数据
DataStream<String> inputStream = env.readTextFile("test.txt");
// 转换成 Score 类型
DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
@Override
public Score map(String value) throws Exception {
String[] array = value.split(",");
return new Score(array[0], array[1], Integer.parseInt(array[2]));
}
});
DataStream<Tuple2<String, Integer>> resultStream = dataStream.map(new MyMapper());
resultStream.print();
env.execute();
}
public static class MyMapper extends RichMapFunction<Score, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(Score score) throws Exception {
return new Tuple2<>(score.getCourse(), getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void open(Configuration parameters) throws Exception {
// 初始化工作,一般是定义状态,或者建立数据库连接
System.out.println("open");
}
@Override
public void close() throws Exception {
// 一般是关闭连接
System.out.println("close");
}
}
数据重分区
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// 从文件中读取数据
DataStream<String> inputStream = env.readTextFile("test.txt");
inputStream.print("input");
// 1. shuffle
DataStream<String> shuffleStream = inputStream.shuffle();
shuffleStream.print("shuffleStream");
// 2. keyBy
DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
@Override
public Score map(String value) throws Exception {
String[] array = value.split(",");
return new Score(array[0], array[1], Integer.parseInt(array[2]));
}
});
dataStream.keyBy("course").print("keyBy");
// 3、global
inputStream.global().print("global");
env.execute();
}
4. Sink
Flink
没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。所有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxx))
官方提供了一部分的框架的 sink。除此以外,需要用户自定义 sink。
kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
// 从kafka中读取数据
DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer011<String>(
"score_consume", new SimpleStringSchema(), properties));
// 转换成SensorReading类型
DataStream<String> dataStream = inputStream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] array = value.split(",");
System.out.println(new Score(array[0], array[1], Integer.parseInt(array[2])).toString());
return new Score(array[0], array[1], Integer.parseInt(array[2])).toString();
}
});
// 输出到 kafka
dataStream.addSink(new FlinkKafkaProducer011<String>(
"localhost:9092", "score_produce", new SimpleStringSchema()
));
env.execute();
}
Redis
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中读取数据
DataStream<String> inputStream = env.readTextFile("test.txt");
// 转换成 Score 类型
DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
public Score map(String value) throws Exception {
String[] array = value.split(",");
return new Score(array[0], array[1], Integer.parseInt(array[2]));
}
});
// 定义 Jedis 连接配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();
dataStream.addSink(new RedisSink<Score>(config, new MyRedisMapper()));
env.execute();
}
// 自定义 RedisMapper
public static class MyRedisMapper implements RedisMapper<Score> {
// 定义保存数据到 Redis 的命令,存成Hash表,hset scores course student-score
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(
RedisCommand.HSET, "scores"
);
}
@Override
public String getKeyFromData(Score score) {
return score.getCourse();
}
@Override
public String getValueFromData(Score score) {
return score.getStudent() + "-" + score.getScore();
}
}
JDBC 自定义 Sink
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.29</version>
</dependency>
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中读取数据
DataStream<String> inputStream = env.readTextFile("test.txt");
// 转换成 Score 类型
DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
@Override
public Score map(String value) throws Exception {
String[] array = value.split(",");
return new Score(array[0], array[1], Integer.parseInt(array[2]));
}
});
dataStream.addSink(new MyJdbcSink());
env.execute();
}
// 实现自定义的SinkFunction
public static class MyJdbcSink extends RichSinkFunction<Score> {
Connection connection = null;
PreparedStatement insertStmt = null;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/test", "root", "111111");
insertStmt = connection.prepareStatement("insert into t_score (course, student, score) value (?, ?, ?)");
}
// 每来一条数据,调用连接,执行sql
@Override
public void invoke(Score score, Context context) throws Exception {
insertStmt.setString(1, score.getCourse());
insertStmt.setString(2, score.getStudent());
insertStmt.setInt(3, score.getScore());
insertStmt.execute();
}
@Override
public void close() throws Exception {
insertStmt = null;
connection.close();
}
}