Table API&SQL的基本概念及使用介绍
浪尖 浪尖聊大数据
Table API和SQL集成在共同API中。这个API的中心概念是一个用作查询的输入和输出的表。本文档显示了具有表API和SQL查询的程序的常见结构,如何注册表,如何查询表以及如何发出表。
Table API和SQL捆绑在flink-table Maven工程中。 为了使用Table API和SQL,必须将以下依赖项添加到您的项目中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>1.3.2</version>
</dependency>
此外,您需要为Flink的Scala批处理或流式API添加依赖关系。 对于批量查询,您需要添加:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.10</artifactId>
<version>1.3.2</version>
</dependency>
对于流式查询,您需要添加:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.10</artifactId>
<version>1.3.2</version>
</dependency>
注意:由于Apache Calcite中的一个问题,阻止用户类加载器被垃圾回收,我们不建议构建一个包含flink-table依赖项的fat-jar。相反,我们建议将Flink配置为在系统类加载器中包含flink-table依赖关系。这可以通过将./opt文件夹中的flink-table.jar文件复制到./lib文件夹来完成。
一,Table API&Sql项目的结构
用于批处理和流式处理的所有Table API和SQL程序都遵循相同的模式。以下代码示例显示了Table API和SQL程序的通用结构。
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...) // or
tableEnv.registerExternalCatalog("extCat", ...)
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult = tableEnv.sql("SELECT ... FROM table2 ...")
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...)
// execute
env.execute()
注意:Table API和SQL查询可以轻松集成到DataStream或DataSet程序中。
二,创建一个TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念。 它负责:
A),在内部catalog中注册表
B),注册外部catalog
C),执行SQL查询
D),注册用户定义(标量,表或聚合)函数
E),将DataStream或DataSet转换为表
F),持有对ExecutionEnvironment或StreamExecutionEnvironment的引用
表总是绑定到一个特定的TableEnvironment。不可能在同一个查询中组合不同TableEnvironments的表,例如,join或union它们。通过使用StreamExecutionEnvironment或ExecutionEnvironment和可选的TableConfig调用静态TableEnvironment.getTableEnvironment()方法创建TableEnvironment。TableConfig可用于配置TableEnvironment或自定义查询优化和翻译过程(请参阅查询优化)。
// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
三,在Catalog中注册一张表
TableEnvironment具有一个表的内部Catalog,按表名组织。 表API或SQL查询可以通过名称引用来访问Catalog中注册的表。
TableEnvironment允许您从各种来源注册表:
A),一个现有的Table对象,通常是一个Table API或SQL查询的结果。
B),一个TableSource,用于访问外部数据,如文件,数据库或消息系统。
C),来自DataStream或DataSet程序的DataStream或DataSet。
1,注册一张表
表在TableEnvironment中注册如下:
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table is the result of a simple projection query
val projTable: Table = tableEnv.scan("X").project(...)
// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable)
注意:注册的表格与从关系数据库系统所知道的VIEW类似,即定义该表的查询未被优化,但是当另一个查询引用已注册的表时将被内联处理。如果多个查询引用相同的注册表,则每个引用查询将被内联并执行多次,即注册表的结果将不会被共享。
2,注册TableSource
TableSource提供对存储在诸如数据库(MySQL,HBase,...)的存储系统中的外部数据的访问,具有特定编码的文件(CSV,Apache [Parquet,Avro,ORC],...))或消息系统 (Apache Kafka,RabbitMQ,...)。
Flink旨在为通用数据格式和存储系统提供TableSources。后面还会出文章讲解TablesSources和Sinks。
TableSource在TableEnvironment中注册如下:
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
四,注册一个外部Catalog
外部Catalog可以提供有关外部数据库和表的信息,例如其名称,模式,统计信息和有关如何访问存储在外部数据库,表或文件中的数据的信息。可以通过实现ExternalCatalog界面创建外部目录,并在TableEnvironment中注册如下:
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog
// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)
一旦注册到TableEnvironment中,可以通过指定其完整路径(如catalog.database.table)从Table API或SQL查询中访问ExternalCatalog中定义的所有表。目前,Flink为演示和测试提供了一个InMemoryExternalCatalog。但是,ExternalCatalog界面也可用于将目录(如HCatalog或Metastore)连接到Table API。
五,查询表
1,Table API
Table API是用于Scala和Java的语言集成查询API。与SQL相反,查询没有被指定为字符串,而是在主机语言中逐步构建。后面会出文章详细介绍这个特性。
该API基于Table类,代表一张表(Streaming或者batch),提供使用相关操作的方法。这些方法返回一个新的Table对象,它表示在输入表中应用关系操作的结果。一些关系操作由多个方法调用组成,如table.groupBy(...).select(),其中groupBy(...)指定分组表,select(...) 从分组表中选取结果。
以下示例显示了一个简单的Table API聚合查询:
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan("Orders")
// compute revenue for all customers from France
Table revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName)
.select('cID, 'cName, 'revenue.sum AS 'revSum)
注意:Scala Table API使用Scala符号,它以单个tick(')开始引用表的属性。Table API使用Scala隐含。 确保导入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._以便使用Scala隐式转换。
2,SQL
Flink的SQL集成基于实现SQL标准的Apache Calcite。 SQL查询被指定为常规字符串。后面会出文章详细介绍这个特性。
以下示例显示如何指定查询并将结果作为表返回。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sql("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// emit or convert Table
// execute query
3,混合Table API和SQL
Table API查询可以在SQL查询返回的Table对象上进行操作。
通过将Table API返回的对象注册成表也可以进行一个SQL查询请求,在SQL查询的FROM子句中引用它。
六,输出一张表
为了输出一个表,可以将它写入一个TableSink。TableSink是支持各种文件格式(例如CSV,Apache Parquet,Apache Avro),存储系统(例如JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息传递系统(例如Apache Kafka,RabbitMQ的)。
批处理表只能写入BatchTableSink,而streaming table需要一个AppendStreamTableSink,一个RetractStreamTableSink或一个UpsertStreamTableSink。
有关Table source和sink的详细信息及如何自定义一个TableSink后面会给出详细的文章。使用例子:
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...
// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim ="|")
// write the result Table to the TableSink
result.writeToSink(sink)
// execute the program
七,翻译并执行查询
Table API和SQL查询根据输入是流式还是批量输入而被转换为DataStream或DataSet程序。 查询内部表示为逻辑查询计划,并分为两个阶段:
A),优化逻辑计划
B),翻译成DataStream或DataSet程序。
Table API或者SQL查询在一下情况下被翻译:
A),表被输出到TableSink,即当调用Table.writeToSink()时。
B),该表被转化为DataStream或者DataSet。
转换结束之后,Table API或SQL查询就像常规DataStream或DataSet程序一样处理,并在调用StreamExecutionEnvironment.execute()或ExecutionEnvironment.execute()时执行。
八,与DataStream和DataSet API集成
Table API和SQL查询可以轻松地集成到DataStream和DataSet程序中并嵌入到其中。表API和SQL查询可以轻松地集成到DataStream和DataSet程序中并嵌入到其中。 例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,投影,聚合或与元数据连接,然后使用DataStream或 DataSet API(以及任何构建在这些API之上的库,如CEP或Gelly)。 相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。
这种交互可以通过将DataStream或DataSet转换为Table来实现,反之亦然。在本节中,我们将介绍如何完成这些转换。
1,Scala的隐式转换
Scala Table API提供DataSet,DataStream和Table类的隐式转换。通过导入包org.apache.flink.table.api.scala._除了用于Scala DataStream API的org.apache.flink.api.scala._之外还可以启用这些转换。
2,将DataStream或DataSet注册为表
结果表的schema 取决于注册的DataStream或DataSet的数据类型。有关详细信息,请查看有关将数据类型映射到表模式的部分。
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
3,将DataStream或DataSet转换为表
不仅仅可以在TableEnvironment中注册DataStream或DataSet,也可以直接转换为Table。 如果要在Table API查询中使用Table,这很方便。
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
4,将表转换为DataStream或DataSet
表可以转换为DataStream或DataSet。以这种方式,可以基于Table API或SQL查询的结果运行自定义DataStream或DataSet程序。
将表转换为DataStream或DataSet时,需要指定生成的DataStream或DataSet的数据类型,即要转换表的行的数据类型。通常最方便的转换类型是Row。以下列表概述了不同选项的功能:
Row:字段通过位置,任意数量的字段映射,支持空值,无类型安全访问。
POJO:按名称映射字段(POJO字段必须命名为表字段),任意字段数,支持空值,类型安全访问。
Case Class:字段按位置映射,不支持空值,类型安全访问。
Tuple:字段通过位置映射,限制为22(Scala)或25(Java)字段,不支持空值,类型安全访问。
Atomic Type:表必须有单个字段,不支持空值,类型安全访问。
4.1 将表转换为DataStream
作为流式查询的结果的表将被动态地更新,即当新记录到达查询的输入流时,它会改变。因此,转换此动态查询的DataStream需要对表的更新进行编码。
将Table转换为DataStream有两种模式:
Append Mode:仅当动态表仅由INSERT更改修改时,才能使用此模式,即只是附加的,并且以前发布的结果永远不会被更新。
Retract Mode:始终可以使用此模式。 它使用布尔标志来编码INSERT和DELETE更改。
// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
注意:有关动态表及其配置属性的介绍后面会出文章。
4.2 将Table转化为DataSet
将表转换为DataSet,如下所示:
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
九,数据类型和表schema映射
Flink的DataStream和DataSet API支持非常多样化的类型,例如Tuples(内置Scala和Flink Java元组),POJO,Case Class和原子类型。下面我们将介绍Table API如何将这些类型转换为内部行表示,并显示将DataStream转换为Table的示例。
1,原子类型
Flink将原始(Integer,Double,String)或通用类型(无法分析和分解的类型)视为原子类型。属性的类型是从原子类型推断的,必须指定属性的名称。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[Long] = ...
// convert DataStream into Table with field 'myLong
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
2,元组(Scala和Java)和Case Class(仅限Scala)
Flink支持Scala的内置元组,并为Java提供自己的元组类。两种元组的DataStreams和DataSet可以转换成表。可以通过为所有字段提供名称(基于位置的映射)来重命名字段。如果未指定字段名称,则使用默认字段名称。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// convert DataStream into Table with field names 'myLong, 'myString
val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
// convert DataStream into Table with default field names '_1, '_2
val table2: Table = tableEnv.fromDataStream(stream)
// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// convert DataStream into Table with default field names 'name, 'age
val tableCC1 = tableEnv.fromDataStream(streamCC)
// convert DataStream into Table with field names 'myName, 'myAge
val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
3,POJO (Java and Scala)
Flink支持POJO作为复合类型。在这里记录了确定POJO的规则。将POJO DataStream或DataSet转换为Table而不指定字段名称时,将使用原始POJO字段的名称。重命名原始POJO字段需要关键字AS,因为POJO字段没有固有的顺序。名称映射需要原始名称,不能通过位置来完成。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...
// convert DataStream into Table with field names 'name, 'age
val table1: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field names 'myName, 'myAge
val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName,'age as 'myAge)
4,Row
Row数据类型支持任意数量的具有空值的字段和字段。字段名称可以通过RowTypeInfo指定,也可以将Row DataStream或DataSet转换为Table(基于位置)。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...
// convert DataStream into Table with field names 'name, 'age
val table1: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field names 'myName, 'myAge
val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
十,查询优化
Apache Flink利用Apache Calcite来优化和翻译查询。目前执行的优化包括投影和过滤器下推,子查询去相关等各种查询重写。Flink还没有优化连接的顺序,而是按照查询中定义的顺序执行它们(FROM子句中的表的顺序和/或WHERE子句中的连接谓词的顺序)。
可以通过提供一个CalciteConfig对象来调整在不同阶段应用的优化规则集。这可以通过调用CalciteConfig.createBuilder())通过构建器创建,并通过调用tableEnv.getConfig.setCalciteConfig(calciteConfig)提供给TableEnvironment。
Table API提供了一种解释计算表的逻辑和优化查询计划的机制。这通过TableEnvironment.explain(table)方法完成。它返回一个描述三个计划的字符串:
1,关系查询的抽象语法树,即未优化的逻辑查询计划,
2,优化的逻辑查询计划
3,物理执行计划
以下代码显示了一个示例和相应的输出:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count,'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count,'word)
val table = table1
.where('word.like("F%"))
.unionAll(table2)
val explanation: String = tEnv.explain(table)
println(explanation)
执行计划如下: