Flink 数据源 DataSource是这个样子的?(三)

7、更多自定义数据源(如 Kafka)

例如在收集日志时,Kafka 消息中间件用得比较多,可以通过官方集成的方法 new FlinkKafkaConsumer 进行添加 Kafka 数据源

测试类位置在:

cn.sevenyuan.datasource.custom.DataSourceFromKafka

DataStreamSource<String> dataStreamSource = env.addSource(
    new FlinkKafkaConsumer<String>(
            KafkaUtils.TOPIC_STUDENT,
            new SimpleStringSchema(),
            props
    )).setParallelism(1);

7.1、测试场景示意Flink 数据源 DataSource是这个样子的?(三)


测试场景如上图,模拟一个 A 应用系统,不断的往 Kafka 发送消息,接着我们的 Flink 监听到 Kafka 的数据变动,搜集在一个时间窗口内(例如 10s)的数据,对窗口内的数据进行转换操作,最后进行存储(简单演示,使用的是 Print 打印)



7.2、前置环境安装和启动


如果在本地测试 Kafka 数据源,需要做这三步前置操作:

1. 安装 ZooKeeper,启动命令: zkServer start 2. 安装 Kafka,启动命令:kafka-server-start /usr/local/etc/kafka/server.properties 3. 安装 Flink,启动单机集群版的命令 /usr/local/Cellar/apache-flink/1.9.0/libexec/bin/start-cluster.sh

7.3、模拟应用系统

在终端中,通过 Kafka 命令创建名字为 studentTopic


$kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic student

启动以下代码的 main 方法,通过 while 循环,每隔 3s 往 kafka 发送一条消息:

KafkaUtils.java


public class KafkaUtils {

    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC_STUDENT = "student";
    public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    public static void writeToKafka() throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_LIST);
        props.put("key.serializer", KEY_SERIALIZER);
        props.put("value.serializer", VALUE_SERIALIZER);
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // 制造传递的对象
        int randomInt = RandomUtils.nextInt(1, 100);
        Student stu = new Student(randomInt, "name" + randomInt, randomInt, "=-=");
        stu.setCheckInTime(new Date());
        // 发送数据
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_STUDENT, null, null, JSON.toJSONString(stu));
        producer.send(record);
        System.out.println("kafka 已发送消息 : " + JSON.toJSONString(stu));
        producer.flush();
    }
    public static void main(String[] args) throws Exception {
        while (true) {
            Thread.sleep(3000);
            writeToKafka();
        }
    }
}

在该工具类中,设定了很多静态变量,例如主题名字、key 序列化类、value的序列化类,之后可以在其它类中进行复用。

点击 main 方法后,可以在控制台终端看到每隔三秒(checkInTime 间隔时间),我们的消息成功的发送出去了。

kafka 已发送消息 : {"address":"=-=","age":49,"checkInTime":1571845900050,"id":49,"name":"name49"}
kafka 已发送消息 : {"address":"=-=","age":92,"checkInTime":1571845903371,"id":92,"name":"name92"}
kafka 已发送消息 : {"address":"=-=","age":72,"checkInTime":1571845906391,"id":72,"name":"name72"}
kafka 已发送消息 : {"address":"=-=","age":19,"checkInTime":1571845909413,"id":19,"name":"name19"}
kafka 已发送消息 : {"address":"=-=","age":34,"checkInTime":1571845912435,"id":34,"name":"name34"}

7.4、启动 Flink 程序

DataSourceFromKafka.java

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 省略 kafka 的参数配置,具体请看代码
    Properties props = new Properties();
    DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<String>(
            KafkaUtils.TOPIC_STUDENT,
            new SimpleStringSchema(),
            props
    )).setParallelism(1);

    // 数据转换 & 打印
    // 从 kafka 读数据,然后进行 map 映射转换
    DataStream<Student> dataStream = dataStreamSource.map(value -> JSONObject.parseObject(value, Student.class));
    // 不需要 keyBy 分类,所以使用 windowAll,每 10s 统计接收到的数据,批量插入到数据库中
    dataStream
            .timeWindowAll(Time.seconds(10))
            .apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
                @Override
                public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
                    List<Student> students = Lists.newArrayList(values);
                    if (students.size() > 0) {
                        System.out.println("最近 10 秒汇集到 " + students.size() + " 条数据");
                        out.collect(students);
                    }
                }
            })
            .print();
        env.execute("test custom kafka datasource");
    }

上述代码主要有三个步骤,获取 Kafka数据源 —> 数据转换(通过 map 映射操作,时间窗口搜集数据) —> 最后的数据存储(简单的 print)。

点击执行代码后,我们就能在控制台中看到如下输出结果:

Flink 数据源 DataSource是这个样子的?(三)

可以看到,按照发送消息的速度,我们能够在 10s 内搜集到 3-4 条数据,从输出结果能够验证 Flink 程序的正确性。

安装操作请参考网上资源,更多详细添加 Kafka 数据源的操作可以看项目中的测试类 DataSourceFromKafkazhisheng 写的 Flink 从 0 到 1 学习 —— 如何自定义 Data Source


8、总结

本章总结大致可以用下面这张思维导图概括:

Flink 数据源 DataSource是这个样子的?(三)

  • 集合、文件:读取本地数据,比较适合在本地测试时使用
  • 套接字:监听主机地址和端口号,获取数据,比较少用
  • 自定义数据源:一般常用的数据源,例如 KafkaHiveRabbitMQ,官方都有集成的依赖,通过 POM 进行引用即可使用,还有想要自己扩展的话,通过继承 RichSourceFunction,重写里面的方法,就能够获取自定义的数据

本文主要写了 Flink 提供的数据源使用,介绍了集合、文件、套接字和自定义数据源的例子。当然请根据自己的用途,选择使用合适的数据源,如有疑惑或不对之处请与我讨论~


项目地址

https://github.com/Vip-Augus/flink-learning-note


git clone https://github.com/Vip-Augus/flink-learning-note
上一篇:WEB安全新玩法 [8] 阻止订单重复提交


下一篇:微信支付,使用证书时出现58错误