Flink 数据源 DataSource是这个样子的?(二)

6、自定义 DataSource

从前面介绍中看到,Flink 提供了一个 addSource(SourceFunction<OUT>) 的方法,其中 SourceFunction 是实现自定义数据源的关键接口,而我们常用来扩展的是它的抽象子类 RichSourceFunction


6.1、RichSourceFunction

进行自定义扩展数据源前,来看下这个类的继承体系:Flink 数据源 DataSource是这个样子的?(二)

下面是我测试的一个场景:

  1. 启动 Redis,手动不断设置某个 key的值,模拟应用不断对它的修改
  2. Flink 读取 Redis 数据源,进行数据加工
  3. 存储加工后的数据(例如放入数据库或者简单打印出来)

于是乎,创建了一个自定义的 Redis 数据源,重写上面图中提到的方法

MyRedisDataSourceFunction.java


public class MyRedisDataSourceFunction extends RichSourceFunction<String> {

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // noop
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (true) {
            String maxNumber = RedisUtils.get("maxNumber", String.class);
            ctx.collect(StringUtils.isBlank(maxNumber) ? "0" : maxNumber);
            // 隔 1 s 执行程序
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        // noop
    }

    @Override
    public void close() throws Exception {
        super.close();
        RedisUtils.close();
    }
}

从上面代码可以看出,我在 run 方法中,通过 while 循环,不断从 Redis中获取数据,关于缓存的相关操作,封装到了 RedisUtils,感兴趣的可以下载项目来看看。

由于偷懒,opencancel 是没有做操作,在关闭方法中,也只是简单释放了 jedis 连接。


6.2、验证自定义数据源结果

DataSourceFromRedis.java

public class DataSourceFromRedis {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> customSource = 
            env.addSource(new MyRedisDataSourceFunction());
        SingleOutputStreamOperator<String> operator = customSource
                .map((MapFunction<String, String>) value -> "当前最大值为 : " + value);
        operator.print();
        env.execute("test custom redis datasource function");
    }
}

上面代码,主要核心在于 env.addSource(new MyRedisDataSourceFunction()),从我们自定义的 Redis 数据源中获取数据,编写好代码后,进行打包并通过 flink run 执行。

为了方便,我直接在本地 IDEA 中,点击了绿色执行按钮,进行本地调试,接着来修改数据源和查看输出结果。

一、修改 Redis 中的数据

$ redis-cli -h localhost -p 6379
> set maxNumber 100
> set maxNumber 200
> set maxNumber 300
> set maxNumber 400

二、查看控制台输出结果

3> 当前最大值为 : 100
4> 当前最大值为 : 100
6> 当前最大值为 : 200
7> 当前最大值为 : 200
1> 当前最大值为 : 200
2> 当前最大值为 : 300
....

可以看到数据源的修改,我们的程序能够正常接收到并进行处理。当然这个 Demo 只是用来演示,用来演示我们可以基于变动的数据源进行更多复杂的操作,从而来达到数据处理想要的目的。

上一篇:查找已连接过的wifi密码


下一篇:OC9_字符串的内存管理