6、自定义 DataSource
从前面介绍中看到,Flink
提供了一个 addSource(SourceFunction<OUT>)
的方法,其中 SourceFunction
是实现自定义数据源的关键接口,而我们常用来扩展的是它的抽象子类 RichSourceFunction
6.1、RichSourceFunction
进行自定义扩展数据源前,来看下这个类的继承体系:
下面是我测试的一个场景:
- 启动
Redis
,手动不断设置某个key
的值,模拟应用不断对它的修改 -
Flink
读取Redis
数据源,进行数据加工 - 存储加工后的数据(例如放入数据库或者简单打印出来)
于是乎,创建了一个自定义的 Redis
数据源,重写上面图中提到的方法
MyRedisDataSourceFunction.java
public class MyRedisDataSourceFunction extends RichSourceFunction<String> { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // noop } @Override public void run(SourceContext<String> ctx) throws Exception { while (true) { String maxNumber = RedisUtils.get("maxNumber", String.class); ctx.collect(StringUtils.isBlank(maxNumber) ? "0" : maxNumber); // 隔 1 s 执行程序 Thread.sleep(1000); } } @Override public void cancel() { // noop } @Override public void close() throws Exception { super.close(); RedisUtils.close(); } }
从上面代码可以看出,我在 run
方法中,通过 while
循环,不断从 Redis
中获取数据,关于缓存的相关操作,封装到了 RedisUtils
,感兴趣的可以下载项目来看看。
由于偷懒,open
、cancel
是没有做操作,在关闭方法中,也只是简单释放了 jedis
连接。
6.2、验证自定义数据源结果
DataSourceFromRedis.java
public class DataSourceFromRedis { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> customSource = env.addSource(new MyRedisDataSourceFunction()); SingleOutputStreamOperator<String> operator = customSource .map((MapFunction<String, String>) value -> "当前最大值为 : " + value); operator.print(); env.execute("test custom redis datasource function"); } }
上面代码,主要核心在于 env.addSource(new MyRedisDataSourceFunction())
,从我们自定义的 Redis
数据源中获取数据,编写好代码后,进行打包并通过 flink run
执行。
为了方便,我直接在本地 IDEA
中,点击了绿色执行按钮,进行本地调试,接着来修改数据源和查看输出结果。
一、修改 Redis
中的数据
$ redis-cli -h localhost -p 6379 > set maxNumber 100 > set maxNumber 200 > set maxNumber 300 > set maxNumber 400
二、查看控制台输出结果
3> 当前最大值为 : 100 4> 当前最大值为 : 100 6> 当前最大值为 : 200 7> 当前最大值为 : 200 1> 当前最大值为 : 200 2> 当前最大值为 : 300 ....
可以看到数据源的修改,我们的程序能够正常接收到并进行处理。当然这个 Demo
只是用来演示,用来演示我们可以基于变动的数据源进行更多复杂的操作,从而来达到数据处理想要的目的。