Table API&SQL的基本概念及使用介绍

Table API&SQL的基本概念及使用介绍

 浪尖 浪尖聊大数据

Table API和SQL集成在共同API中。这个API的中心概念是一个用作查询的输入和输出的表。本文档显示了具有表API和SQL查询的程序的常见结构,如何注册表,如何查询表以及如何发出表。

Table API和SQL捆绑在flink-table Maven工程中。 为了使用Table API和SQL,必须将以下依赖项添加到您的项目中:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table_2.10</artifactId>
   <version>1.3.2</version>
</dependency>

此外,您需要为Flink的Scala批处理或流式API添加依赖关系。 对于批量查询,您需要添加:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-scala_2.10</artifactId>
   <version>1.3.2</version>
</dependency>

对于流式查询,您需要添加:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-scala_2.10</artifactId>
   <version>1.3.2</version>
</dependency>

注意:由于Apache Calcite中的一个问题,阻止用户类加载器被垃圾回收,我们不建议构建一个包含flink-table依赖项的fat-jar。相反,我们建议将Flink配置为在系统类加载器中包含flink-table依赖关系。这可以通过将./opt文件夹中的flink-table.jar文件复制到./lib文件夹来完成。

 

一,Table API&Sql项目的结构

用于批处理和流式处理的所有Table  API和SQL程序都遵循相同的模式。以下代码示例显示了Table API和SQL程序的通用结构。

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// create a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register a Table
tableEnv.registerTable("table1", ...)           // or
tableEnv.registerTableSource("table2", ...)     // or
tableEnv.registerExternalCatalog("extCat", ...)

// create a Table from a Table API query
val tapiResult tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult  tableEnv.sql("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...)

// execute
env.execute()

注意:Table API和SQL查询可以轻松集成到DataStream或DataSet程序中。

 

二,创建一个TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念。 它负责:

A),在内部catalog中注册表

B),注册外部catalog

C),执行SQL查询

D),注册用户定义(标量,表或聚合)函数

E),将DataStream或DataSet转换为表

F),持有对ExecutionEnvironment或StreamExecutionEnvironment的引用

表总是绑定到一个特定的TableEnvironment。不可能在同一个查询中组合不同TableEnvironments的表,例如,join或union它们。通过使用StreamExecutionEnvironment或ExecutionEnvironment和可选的TableConfig调用静态TableEnvironment.getTableEnvironment()方法创建TableEnvironment。TableConfig可用于配置TableEnvironment或自定义查询优化和翻译过程(请参阅查询优化)。

// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

 

三,在Catalog中注册一张表

TableEnvironment具有一个表的内部Catalog,按表名组织。 表API或SQL查询可以通过名称引用来访问Catalog中注册的表。

TableEnvironment允许您从各种来源注册表:

A),一个现有的Table对象,通常是一个Table API或SQL查询的结果。

B),一个TableSource,用于访问外部数据,如文件,数据库或消息系统。

C),来自DataStream或DataSet程序的DataStream或DataSet。

 

1,注册一张表

表在TableEnvironment中注册如下:

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table is the result of a simple projection query 
val projTable: Table = tableEnv.scan("X").project(...)

// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable)

注意:注册的表格与从关系数据库系统所知道的VIEW类似,即定义该表的查询未被优化,但是当另一个查询引用已注册的表时将被内联处理。如果多个查询引用相同的注册表,则每个引用查询将被内联并执行多次,即注册表的结果将不会被共享。

 

2,注册TableSource

TableSource提供对存储在诸如数据库(MySQL,HBase,...)的存储系统中的外部数据的访问,具有特定编码的文件(CSV,Apache [Parquet,Avro,ORC],...))或消息系统 (Apache Kafka,RabbitMQ,...)。

Flink旨在为通用数据格式和存储系统提供TableSources。后面还会出文章讲解TablesSources和Sinks。

TableSource在TableEnvironment中注册如下:

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)

 

四,注册一个外部Catalog

外部Catalog可以提供有关外部数据库和表的信息,例如其名称,模式,统计信息和有关如何访问存储在外部数据库,表或文件中的数据的信息。可以通过实现ExternalCatalog界面创建外部目录,并在TableEnvironment中注册如下:

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦注册到TableEnvironment中,可以通过指定其完整路径(如catalog.database.table)从Table API或SQL查询中访问ExternalCatalog中定义的所有表。目前,Flink为演示和测试提供了一个InMemoryExternalCatalog。但是,ExternalCatalog界面也可用于将目录(如HCatalog或Metastore)连接到Table API。

 

五,查询表

 

1,Table API

Table API是用于Scala和Java的语言集成查询API。与SQL相反,查询没有被指定为字符串,而是在主机语言中逐步构建。后面会出文章详细介绍这个特性。

该API基于Table类,代表一张表(Streaming或者batch),提供使用相关操作的方法。这些方法返回一个新的Table对象,它表示在输入表中应用关系操作的结果。一些关系操作由多个方法调用组成,如table.groupBy(...).select(),其中groupBy(...)指定分组表,select(...) 从分组表中选取结果。

以下示例显示了一个简单的Table API聚合查询:

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register Orders table

// scan registered Orders table
Table orders = tableEnv.scan("Orders")
// compute revenue for all customers from France
Table revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

注意:Scala Table API使用Scala符号,它以单个tick(')开始引用表的属性。Table API使用Scala隐含。 确保导入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._以便使用Scala隐式转换。

 

2,SQL

Flink的SQL集成基于实现SQL标准的Apache Calcite。 SQL查询被指定为常规字符串。后面会出文章详细介绍这个特性。

 

以下示例显示如何指定查询并将结果作为表返回。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sql(""" 
                               |SELECT cID, cName, SUM(revenue) AS revSum
                               |FROM Orders
                               |WHERE cCountry = 'FRANCE'
                               |GROUP BY cID, cName
                             """.stripMargin)

// emit or convert Table
// execute query

 

3,混合Table API和SQL

Table API查询可以在SQL查询返回的Table对象上进行操作。

通过将Table API返回的对象注册成表也可以进行一个SQL查询请求,在SQL查询的FROM子句中引用它。

 

六,输出一张表

为了输出一个表,可以将它写入一个TableSink。TableSink是支持各种文件格式(例如CSV,Apache Parquet,Apache Avro),存储系统(例如JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息传递系统(例如Apache Kafka,RabbitMQ的)。

批处理表只能写入BatchTableSink,而streaming table需要一个AppendStreamTableSink,一个RetractStreamTableSink或一个UpsertStreamTableSink。

有关Table source和sink的详细信息及如何自定义一个TableSink后面会给出详细的文章。使用例子:

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...

// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim ="|")

// write the result Table to the TableSink
result.writeToSink(sink)

// execute the program

 

七,翻译并执行查询

Table API和SQL查询根据输入是流式还是批量输入而被转换为DataStream或DataSet程序。 查询内部表示为逻辑查询计划,并分为两个阶段:

A),优化逻辑计划

B),翻译成DataStream或DataSet程序。

Table API或者SQL查询在一下情况下被翻译:

A),表被输出到TableSink,即当调用Table.writeToSink()时。

B),该表被转化为DataStream或者DataSet。

转换结束之后,Table API或SQL查询就像常规DataStream或DataSet程序一样处理,并在调用StreamExecutionEnvironment.execute()或ExecutionEnvironment.execute()时执行。

 

八,与DataStream和DataSet API集成

Table API和SQL查询可以轻松地集成到DataStream和DataSet程序中并嵌入到其中。表API和SQL查询可以轻松地集成到DataStream和DataSet程序中并嵌入到其中。 例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,投影,聚合或与元数据连接,然后使用DataStream或 DataSet API(以及任何构建在这些API之上的库,如CEP或Gelly)。 相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。

这种交互可以通过将DataStream或DataSet转换为Table来实现,反之亦然。在本节中,我们将介绍如何完成这些转换。

 

1,Scala的隐式转换

Scala Table API提供DataSet,DataStream和Table类的隐式转换。通过导入包org.apache.flink.table.api.scala._除了用于Scala DataStream API的org.apache.flink.api.scala._之外还可以启用这些转换。

 

2,将DataStream或DataSet注册为表

结果表的schema 取决于注册的DataStream或DataSet的数据类型。有关详细信息,请查看有关将数据类型映射到表模式的部分。

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

 

3,将DataStream或DataSet转换为表

不仅仅可以在TableEnvironment中注册DataStream或DataSet,也可以直接转换为Table。 如果要在Table API查询中使用Table,这很方便。

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)

// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

 

4,将表转换为DataStream或DataSet

表可以转换为DataStream或DataSet。以这种方式,可以基于Table API或SQL查询的结果运行自定义DataStream或DataSet程序。

将表转换为DataStream或DataSet时,需要指定生成的DataStream或DataSet的数据类型,即要转换表的行的数据类型。通常最方便的转换类型是Row。以下列表概述了不同选项的功能:

Row:字段通过位置,任意数量的字段映射,支持空值,无类型安全访问。

POJO:按名称映射字段(POJO字段必须命名为表字段),任意字段数,支持空值,类型安全访问。

Case Class:字段按位置映射,不支持空值,类型安全访问。

Tuple:字段通过位置映射,限制为22(Scala)或25(Java)字段,不支持空值,类型安全访问。

Atomic Type:表必须有单个字段,不支持空值,类型安全访问。

 

4.1 将表转换为DataStream

作为流式查询的结果的表将被动态地更新,即当新记录到达查询的输入流时,它会改变。因此,转换此动态查询的DataStream需要对表的更新进行编码。

将Table转换为DataStream有两种模式:

Append Mode:仅当动态表仅由INSERT更改修改时,才能使用此模式,即只是附加的,并且以前发布的结果永远不会被更新。

Retract Mode:始终可以使用此模式。 它使用布尔标志来编码INSERT和DELETE更改。

// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
  tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注意:有关动态表及其配置属性的介绍后面会出文章。

 

4.2 将Table转化为DataSet

将表转换为DataSet,如下所示:

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

 

九,数据类型和表schema映射

Flink的DataStream和DataSet API支持非常多样化的类型,例如Tuples(内置Scala和Flink Java元组),POJO,Case Class和原子类型。下面我们将介绍Table API如何将这些类型转换为内部行表示,并显示将DataStream转换为Table的示例。

 

1,原子类型

Flink将原始(Integer,Double,String)或通用类型(无法分析和分解的类型)视为原子类型。属性的类型是从原子类型推断的,必须指定属性的名称。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[Long] = ...
// convert DataStream into Table with field 'myLong
val table: Table = tableEnv.fromDataStream(stream, 'myLong)

 

2,元组(Scala和Java)和Case Class(仅限Scala)

Flink支持Scala的内置元组,并为Java提供自己的元组类。两种元组的DataStreams和DataSet可以转换成表。可以通过为所有字段提供名称(基于位置的映射)来重命名字段。如果未指定字段名称,则使用默认字段名称。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// convert DataStream into Table with field names 'myLong, 'myString
val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

// convert DataStream into Table with default field names '_1, '_2
val table2: Table = tableEnv.fromDataStream(stream)

// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// convert DataStream into Table with default field names 'name, 'age
val tableCC1 = tableEnv.fromDataStream(streamCC)

// convert DataStream into Table with field names 'myName, 'myAge
val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)

 

3,POJO (Java and Scala)

Flink支持POJO作为复合类型。在这里记录了确定POJO的规则。将POJO DataStream或DataSet转换为Table而不指定字段名称时,将使用原始POJO字段的名称。重命名原始POJO字段需要关键字AS,因为POJO字段没有固有的顺序。名称映射需要原始名称,不能通过位置来完成。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...

// convert DataStream into Table with field names 'name, 'age
val table1: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names 'myName, 'myAge
val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName,'age as 'myAge)

 

4,Row

Row数据类型支持任意数量的具有空值的字段和字段。字段名称可以通过RowTypeInfo指定,也可以将Row DataStream或DataSet转换为Table(基于位置)。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...

// convert DataStream into Table with field names 'name, 'age
val table1: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names 'myName, 'myAge
val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)

 

十,查询优化

Apache Flink利用Apache Calcite来优化和翻译查询。目前执行的优化包括投影和过滤器下推,子查询去相关等各种查询重写。Flink还没有优化连接的顺序,而是按照查询中定义的顺序执行它们(FROM子句中的表的顺序和/或WHERE子句中的连接谓词的顺序)。

可以通过提供一个CalciteConfig对象来调整在不同阶段应用的优化规则集。这可以通过调用CalciteConfig.createBuilder())通过构建器创建,并通过调用tableEnv.getConfig.setCalciteConfig(calciteConfig)提供给TableEnvironment。

Table API提供了一种解释计算表的逻辑和优化查询计划的机制。这通过TableEnvironment.explain(table)方法完成。它返回一个描述三个计划的字符串:

1,关系查询的抽象语法树,即未优化的逻辑查询计划,

2,优化的逻辑查询计划

3,物理执行计划

以下代码显示了一个示例和相应的输出:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count,'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count,'word)
val table = table1
  .where('word.like("F%"))
  .unionAll(table2)

val explanation: String = tEnv.explain(table)
println(explanation)

执行计划如下:

Table API&SQL的基本概念及使用介绍

上一篇:Flink Application Development DataStream API Event Time--Flink应用开发DataStream API事件时间


下一篇:大数据(9c)Flink运行架构