7、更多自定义数据源(如 Kafka)
例如在收集日志时,Kafka
消息中间件用得比较多,可以通过官方集成的方法 new FlinkKafkaConsumer
进行添加 Kafka
数据源
测试类位置在:
cn.sevenyuan.datasource.custom.DataSourceFromKafka
DataStreamSource<String> dataStreamSource = env.addSource( new FlinkKafkaConsumer<String>( KafkaUtils.TOPIC_STUDENT, new SimpleStringSchema(), props )).setParallelism(1);
7.1、测试场景示意
测试场景如上图,模拟一个 A
应用系统,不断的往 Kafka
发送消息,接着我们的 Flink
监听到 Kafka
的数据变动,搜集在一个时间窗口内(例如 10s)的数据,对窗口内的数据进行转换操作,最后进行存储(简单演示,使用的是 Print
打印)
7.2、前置环境安装和启动
如果在本地测试 Kafka
数据源,需要做这三步前置操作:
1. 安装 ZooKeeper
,启动命令: zkServer start
2. 安装 Kafka
,启动命令:kafka-server-start /usr/local/etc/kafka/server.properties
3. 安装 Flink
,启动单机集群版的命令 /usr/local/Cellar/apache-flink/1.9.0/libexec/bin/start-cluster.sh
7.3、模拟应用系统
在终端中,通过 Kafka
命令创建名字为 student
的 Topic
:
$kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic student
启动以下代码的 main
方法,通过 while
循环,每隔 3s 往 kafka
发送一条消息:
KafkaUtils.java
public class KafkaUtils { public static final String BROKER_LIST = "localhost:9092"; public static final String TOPIC_STUDENT = "student"; public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; public static void writeToKafka() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", BROKER_LIST); props.put("key.serializer", KEY_SERIALIZER); props.put("value.serializer", VALUE_SERIALIZER); KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 制造传递的对象 int randomInt = RandomUtils.nextInt(1, 100); Student stu = new Student(randomInt, "name" + randomInt, randomInt, "=-="); stu.setCheckInTime(new Date()); // 发送数据 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_STUDENT, null, null, JSON.toJSONString(stu)); producer.send(record); System.out.println("kafka 已发送消息 : " + JSON.toJSONString(stu)); producer.flush(); } public static void main(String[] args) throws Exception { while (true) { Thread.sleep(3000); writeToKafka(); } } }
在该工具类中,设定了很多静态变量,例如主题名字、key
序列化类、value
的序列化类,之后可以在其它类中进行复用。
点击 main
方法后,可以在控制台终端看到每隔三秒(checkInTime
间隔时间),我们的消息成功的发送出去了。
kafka 已发送消息 : {"address":"=-=","age":49,"checkInTime":1571845900050,"id":49,"name":"name49"} kafka 已发送消息 : {"address":"=-=","age":92,"checkInTime":1571845903371,"id":92,"name":"name92"} kafka 已发送消息 : {"address":"=-=","age":72,"checkInTime":1571845906391,"id":72,"name":"name72"} kafka 已发送消息 : {"address":"=-=","age":19,"checkInTime":1571845909413,"id":19,"name":"name19"} kafka 已发送消息 : {"address":"=-=","age":34,"checkInTime":1571845912435,"id":34,"name":"name34"}
7.4、启动 Flink 程序
DataSourceFromKafka.java
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 省略 kafka 的参数配置,具体请看代码 Properties props = new Properties(); DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<String>( KafkaUtils.TOPIC_STUDENT, new SimpleStringSchema(), props )).setParallelism(1); // 数据转换 & 打印 // 从 kafka 读数据,然后进行 map 映射转换 DataStream<Student> dataStream = dataStreamSource.map(value -> JSONObject.parseObject(value, Student.class)); // 不需要 keyBy 分类,所以使用 windowAll,每 10s 统计接收到的数据,批量插入到数据库中 dataStream .timeWindowAll(Time.seconds(10)) .apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception { List<Student> students = Lists.newArrayList(values); if (students.size() > 0) { System.out.println("最近 10 秒汇集到 " + students.size() + " 条数据"); out.collect(students); } } }) .print(); env.execute("test custom kafka datasource"); }
上述代码主要有三个步骤,获取 Kafka
数据源 —> 数据转换(通过 map
映射操作,时间窗口搜集数据) —> 最后的数据存储(简单的 print
)。
点击执行代码后,我们就能在控制台中看到如下输出结果:
可以看到,按照发送消息的速度,我们能够在 10s 内搜集到 3-4 条数据,从输出结果能够验证 Flink
程序的正确性。
安装操作请参考网上资源,更多详细添加 Kafka
数据源的操作可以看项目中的测试类 DataSourceFromKafka
和 zhisheng
写的 Flink 从 0 到 1 学习 —— 如何自定义 Data Source
8、总结
本章总结大致可以用下面这张思维导图概括:
- 集合、文件:读取本地数据,比较适合在本地测试时使用
- 套接字:监听主机地址和端口号,获取数据,比较少用
- 自定义数据源:一般常用的数据源,例如
Kafka
、Hive
和RabbitMQ
,官方都有集成的依赖,通过POM
进行引用即可使用,还有想要自己扩展的话,通过继承RichSourceFunction
,重写里面的方法,就能够获取自定义的数据
本文主要写了 Flink
提供的数据源使用,介绍了集合、文件、套接字和自定义数据源的例子。当然请根据自己的用途,选择使用合适的数据源,如有疑惑或不对之处请与我讨论~
项目地址
https://github.com/Vip-Augus/flink-learning-note
git clone https://github.com/Vip-Augus/flink-learning-note