Flink 必知必会经典课程7:Flink Ecosystems

作者:李锐

本文主要介绍了Flink SQL连接外部系统的原因和原理,介绍了常用的Flink SQL Connector,包括Kafka Connector、Elasticsearch Connector、FileSystem Connector、Hive Connector等等。本文主要分为2个部分:

  1. Flink SQL连接外部系统的实现原理
  2. Flink SQL常用的Connector

Flink SQL连接外部系统的实现原理

在讲原理之前,我们先回答为什么要使用Flink SQL?SQL是一个标准化的数据查询语言,而在Flink SQL中,我们可以通过Catalog与各种系统集成,同时我们也开发了很丰富的内置操作符和函数,而且Flink SQL还可以同时处理批数据和流数据,能极大地提高数据分析的工作效率。

那么Flink SQL为什么又要对接外部系统呢?Flink SQL本身是一个流计算的引擎,它本身不维护任何数据,所以对Flink SQL而言,所有的数据都存储在外部系统当中,也就是所有的表都是在外部系统中,我们只有对接这些外部系统,才能够对数据进行实际的读写。

Flink 必知必会经典课程7:Flink Ecosystems

在讲解Flink SQL如何与外部系统对接之前,我们先看一下Flink内部DataStream和Table是如何做转换的?假设已经有一个DataStream程序了,那么我们可以把它转换成Table的方式来使用,用Flink SQL的一些强大功能对它进行查询,可以通过下列例子理解,类似于Flink SQL内部的对接。

Flink 必知必会经典课程7:Flink Ecosystems

Connector

对于Flink SQL而言,对接外部系统的组件被称作Connector。下面这张表里列出了Flink SQL所支持的几个比较常用的Connector,比如Filesystem对接的是文件系统,JDBC对接的是外部的关系型数据库等等。每一个Connector主要负责实现一个source和一个sink, source负责从外部系统中读数据,sink负责把数据写入到外部系统中。

Flink 必知必会经典课程7:Flink Ecosystems

Format

Format指定了数据在外部系统中的格式,比如一个Kafka的表,它里面的数据可能是CSV格式存储的,也有可能是JSON格式存储的,所以我们在指定一个Connector连接外部表的时候,通常也需要指定Format是什么,这样Flink 才能正确地去读写这个数据。

Flink 必知必会经典课程7:Flink Ecosystems

Catalog

Catalog可以连接外部系统的元数据,然后把元数据信息提供给Flink,这样Flink可以直接去访问外部系统中已经创建好的表或者database等等。比如Hive的元数据是存储在Hive Metastore中的,那么Flink如果想访问Hive表的话,就有一个HiveCatalog来对接元数据。除此之外,它还可以帮助Flink 来持久化它自身的元数据。比如说HiveCatalog既可以帮Flink 来访问Hive,也可以帮Flink来存储一些Flink所创建的表的信息,这样就不需要每次启动Session的时候重新建表了,直接去读取Hive Metastore中建好的表就可以了。

Flink 必知必会经典课程7:Flink Ecosystems

如何创建一张表来指定外部的 connector?下面的例子是通过DDL来创建的一张表,这是一个比较标准的Create Table语句,其中所有跟Connector相关的参数都在with语句当中指定,比如这里的Connector等于Kafka等等。

Flink 必知必会经典课程7:Flink Ecosystems

当通过DDL创建了一张表后,这个表是如何在Flink当中被使用的?这里有一个很关键的概念就是Table Factory。在这个黄色的框里面,我们可以通过DDL建表,或者可以通过Catalog从外部系统中拿到,然后被转化成Catalog Table对象。当我们在SQL语句中引用Catalog Table时,Flink会为这张表创建对应的source或者是sink,创建source和sink的这个模块儿就叫做Table Factory。

获取Table Factory的方式有两种,一个是Catalog本身绑定了一个Table Factory,另一种是通过Java的SPI来确定Table Factory,但是它查找的时候要正好有一个配对才不会报错。

Flink 必知必会经典课程7:Flink Ecosystems

Flink SQL常用的Connector

Kafka Connector

Kafka Connector是用得最多的,因为Flink是一个流计算的引擎,而Kafka又是最流行的消息队列,所以用Flink的用户大部分也都在用Kafka。如果我们要创建Kafka的表,就需要指定一些特定的参数,比如将Connector字段指定成Kafka,还有Kafka对应的topic等,我们可以在下图看到这些参数及其所代表的的含义。

Flink 必知必会经典课程7:Flink Ecosystems

要使用Kafka Connector,就需要添加Kafka一些依赖的Jar包,根据所使用的Kafka版本不一样,添加的Jar包也不太一样,这些Jar包都可以在官网上下载到。

Flink 必知必会经典课程7:Flink Ecosystems

Elasticsearch Connector

Elasticsearch Connector只实现了Sink,所以只能往ES里去写,而不能从里面读。它的Connector类型可以指定成ES6或者ES7;Hosts就是指定的ES的各个节点,通过域名加端口号的形式;Index是指定写ES的index,类似于传统数据库当中的一张表;Document Type类似于传统数据库的表里面的某一行,不过在ES7里不需要指定。

Flink 必知必会经典课程7:Flink Ecosystems

ES的Sink支持append和upsert两种模式,如果这张ES表在定义的时候指定了PK,那么Sink就会以upsert模式工作,如果没有指定PK,就以append模式来工作,但是像ROW和MAP等类型是不能作为PK的。

Flink 必知必会经典课程7:Flink Ecosystems

同样,使用ES也需要指定额外的依赖,针对不同的ES版本添加ES Connector。

Flink 必知必会经典课程7:Flink Ecosystems

FileSystem Connector

这个Connector对接的是一个文件系统,它读写的是这个文件系统上的文件。这里所说的FileSystem指的是Flink的FileSystem抽象,它支持很多种不同的实现,比如支持本地文件系统、Hadoop、S3、OSS等不同的实现。同时它还支持分区,采取与Hive相似的分区目录结构,但分区信息不需要注册到Catalog中。

Flink 必知必会经典课程7:Flink Ecosystems

Hive Connector

Hive应该是最早的SQL引擎,在批处理场景中大部分用户都在使用。Hive Connector可以分为两个层面,首先在元数据上,我们通过HiveCatalog来对接Hive元数据,同时我们提供HiveTableSource、HiveTableSink来读写 Hive的表数据。

Flink 必知必会经典课程7:Flink Ecosystems

使用Hive Connector需要指定Hive Catalog,这里是一个例子,展示如何指定Hive Catalog。

Flink 必知必会经典课程7:Flink Ecosystems

使用Hive Connector也需要添加一些额外的依赖,大家可以根据所使用的Hive版本来选择对应的Jar包。

Flink 必知必会经典课程7:Flink Ecosystems

除了连接外部系统外,我们也有内置的Connector,它们一方面是帮助新的用户能够尽快地上手,体验Flink SQL强大的功能,另一方面也能帮助Flink的开发人员做一些代码的调试。

DataGen Connector

DataGen Connector是一个数据生成器。比如这里创建了一个DataGen的表,指定了几个字段。把Connector的类型指定成DataGen,这个时候去读这张表,Connector会负责生成数据,也就是说数据是生成出来的,并不是事先要存储在某个地方。然后用户可以对DataGen Connector做一些比较细粒度的控制,比如可以指定每秒钟生成多少行数据,然后某个字段可以指定它通过sequence也就是从小到大来创建,也可以指定通过random的方式来创建等等。

Flink 必知必会经典课程7:Flink Ecosystems

Print Connector

Print Connector提供Sink功能,负责把所有的数据打印到标准输出或者标准错误输出上,打印的格式是前面会带一个row kind。创建 print的表的时候只需要把Connector类型指定成print就可以了。

Flink 必知必会经典课程7:Flink Ecosystems

BlackHole Connector

BlackHole Connector也是一个Sink,它会丢弃掉所有的数据,也就是说数据写过来它什么都不做就丢掉了,主要是可以用来做性能的测试。创建BlackHole你只需要把Connector类型指定成BlackHole就可以了。

Flink 必知必会经典课程7:Flink Ecosystems

Demo可以参考https://github.com/flink-china/sql-training/wiki/%E7%94%9F%E6%80%81%E4%B8%8E%E5%86%99%E5%85%A5%E5%A4%96%E9%83%A8%E8%A1%A8

Flink 必知必会经典课程7:Flink Ecosystems

上一篇:Flink 必知必会经典课程8:Flink Connector 详解


下一篇:SpringCloud微服务日志经kafka缓冲写入到ELK