Flink Table和SQL的TableEnvironment的创建、程序结构、简单使用示例


1. pom.xml依赖


<!-- IDEA运行需要 -->

<!-- 实现custom format or connector需要 -->

2. 使用Table处理有界和*数据流例子

  1. 处理有界数据流


package apiTest

import org.apache.flink.table.api.DataTypes.{ROW, FIELD, BIGINT, STRING, INT}
import org.apache.flink.table.api.{$, EnvironmentSettings, TableEnvironment, row}
import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal, AnyWithOperations}

object TableBatchDemo {

  def main(args: Array[String]): Unit = {

    val settings = EnvironmentSettings.newInstance().

    val tEnv = TableEnvironment.create(settings)

    // 定义数据类型
    val MyOrder = ROW(FIELD("id", BIGINT()),
      FIELD("product", STRING()),
      FIELD("amount", INT()))

    val table = tEnv.fromValues(MyOrder, row(1L, "BMW", 1),
      row(2L, "Tesla", 8),
      row(2L, "Tesla", 8),
      row(3L, "BYD", 20))

    val filtered = table.where($("amount").isGreaterOrEqual(8))

    // 调用execute,数据被collect到Job Manager




|                   id |                        product |      amount |
|                    2 |                          Tesla |           8 |
|                    2 |                          Tesla |           8 |
|                    3 |                            BYD |          20 |
3 rows in set
  1. 处理*数据流


package apiTest

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.{$, AnyWithOperations, EnvironmentSettings, ExplainDetail, TableEnvironment, string2Literal}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.common.typeinfo.Types.{ROW, STRING}

object TableStreamDemo {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance()

    val tEnv = StreamTableEnvironment.create(senv, bsSettings)
    // 此方式定义的tEnv不能使用fromDataStream函数
    // val tEnv = TableEnvironment.create(bsSettings)

    var dataStream: DataStream[String] =senv.addSource(new WordSourceFunction())
    val table = tEnv.fromDataStream(dataStream, $("word"))

    val filtered = table.where($("word").like("%t%"))
    val explain = filtered.explain(ExplainDetail.JSON_EXECUTION_PLAN)

    // 定义隐式值给toAppendStream函数
    implicit val row_string_type = ROW(STRING)





== Abstract Syntax Tree ==
LogicalFilter(condition=[LIKE($0, _UTF-16LE'%t%')])
+- LogicalTableScan(table=[[Unregistered_DataStream_1]])

== Optimized Physical Plan ==
Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])
+- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])

== Optimized Execution Plan ==
Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])
+- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])

== Physical Execution Plan ==
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])",
    "pact" : "Operator",
    "contents" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
    "pact" : "Operator",
    "contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
table:6> +I[stream]
table:7> +I[table]
table:8> +I[batch]
table:1> +I[batch]

3. 使用SQL处理有界和*数据流例子

  1. 处理有界数据流


package apiTest

import org.apache.flink.table.api.DataTypes.{BIGINT, FIELD, INT, ROW, STRING}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, row}
import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal}

object SqlBatchDemo {

  def main(args: Array[String]): Unit = {

    val settings = EnvironmentSettings.newInstance()
    val tEnv = TableEnvironment.create(settings)

    val MyOrder = ROW(FIELD("id", BIGINT()),
      FIELD("product", STRING()),
      FIELD("amount", INT())
    val input = tEnv.fromValues(MyOrder, row(1L, "BMW", 1),
      row(2L, "Tesla", 8),
      row(2L, "Tesla", 8),
      row(3L, "BYD", 20))

    val table = tEnv.sqlQuery("select product, sum(amount) as amount from myOrder group by product")




|                        product |      amount |
|                          Tesla |          16 |
|                            BYD |          20 |
|                            BMW |           1 |
3 rows in set
  1. 处理*数据流


package apiTest

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.{$, EnvironmentSettings}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.common.typeinfo.Types.{ROW, STRING}

object SqlStreamDemo {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance()
    val tEnv = StreamTableEnvironment.create(senv,bsSettings)

    val stream:DataStream[String] = senv.addSource(new WordSourceFunction())
    val table = tEnv.fromDataStream(stream, $("word"))

    val result = tEnv.sqlQuery("select * from " + table + " where word like '%t%'")
    implicit val row_string_type = ROW(STRING)





  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[word])",
    "pact" : "Operator",
    "contents" : "SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[word])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
    "pact" : "Operator",
    "contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "SinkConversionToRow",
    "pact" : "Operator",
    "contents" : "SinkConversionToRow",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "Sink: Print to Std. Out",
    "pact" : "Data Sink",
    "contents" : "Sink: Print to Std. Out",
    "parallelism" : 8,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "REBALANCE",
      "side" : "second"
    } ]
  } ]
7> +I[batch]
8> +I[table]
1> +I[stream]
2> +I[stream]
3> +I[table]

3. Table API和SQL的概念

3.1 TableEnvironment的创建

  • Table API和SQL都是基于Table接口,catalog相同
  • Table API和SQL都先用Apache Calcite来解析优化等,再用Blink planner进行解析优化等,Stream模式和Batch模式最后都转化成DataStream API的Transformation
  1. EnvironmentSettings方式
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings = EnvironmentSettings
  .inStreamingMode()    // 用于stream模式
  //.inBatchMode()      // 用于batch模式

val tEnv = TableEnvironment.create(settings)
  • 不能和DataStream进行交互
  • 不支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)
  1. StreamTableEnvironment方式
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val senv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(senv)
  • 可以和DataStream进行交互
  • 支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)

3.2 Table API和SQL的程序结构


mysql> show create table flink_test.user1;
| Table | Create Table                                                                                                                                                                                         |
| user1 | CREATE TABLE `user1` (
  `id` bigint NOT NULL,
  `name` varchar(128) DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
1 row in set (0.00 sec)

mysql> select * from flink_test.user1;
| id | name | age  |
|  1 | yi   |    1 |
|  2 | er   |    2 |
|  3 | san  |    3 |
3 rows in set (0.00 sec)

mysql> show create table flink_test.user2;
| Table | Create Table                                                                                                                                                                                         |
| user2 | CREATE TABLE `user2` (
  `id` bigint NOT NULL,
  `name` varchar(128) DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
1 row in set (0.01 sec)



package TableApiTest

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object TableSqlTest {

  def main(args: Array[String]): Unit = {

    // 定义Table环境
    val settings = EnvironmentSettings
    val tEnv = TableEnvironment.create(settings)

    val table_str =
        |create temporary table %s(
        |  id bigint,
        |  name string,
        |  age int,
        |  primary key (id) not enforced
        |) with (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = '%s',
        |   'username' = 'root',
        |   'password' = 'Root_123'
    // 在catalog注册表
    tEnv.executeSql(table_str.format("user1", "user1"))
    tEnv.executeSql(table_str.format("user2", "user2"))

    // =====================读取源表数据=====================
    // val user1 = tEnv.from("user1")   // 方式一
    // val user1 = tEnv.sqlQuery("select * from user1 limit 2")   // 方式二

    // =====================向目标表插入数据=====================
    // user1.executeInsert("user2")     // 方式一

    val stmtSet = tEnv.createStatementSet()
    // stmtSet.addInsert("user2", user1)    // 方式二
    stmtSet.addInsertSql("insert into user2 select * from user1 limit 2")   // 方式三

  • Batch模式只能插入BatchTableSink
  • Streaming模式可以插入AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink


mysql> select * from user2;
| id | name | age  |
|  1 | yi   |    1 |
|  2 | er   |    2 |
2 rows in set (0.00 sec)


3.3 Table API和SQL的简单使用

3.3.1 聚合查询


package TableApiTest

import org.apache.flink.table.api.Expressions.{$, lit, row}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object TableSqlTest {

  def main(args: Array[String]): Unit = {
    // 定义Table环境
    val settings = EnvironmentSettings
    val tEnv = TableEnvironment.create(settings)

    val table = tEnv.fromValues(
      row(10, "A"),
      row(20, "A"),
      row(100, "B"),
      row(200, "B")
    ).renameColumns($("f0").as("amount"), $("f1").as("name"))
    tEnv.createTemporaryView("tmp_table", table)

    val table_result = table
      .select($("name"), $("amount").sum().as("amount"))

    val sql_result = tEnv.sqlQuery("select name, sum(amount) as amount from tmp_table where amount > 0 group by name")



| op |                           name |      amount |
| +I |                              A |          10 |
| -U |                              A |          10 |
| +U |                              A |          30 |
| +I |                              B |         100 |
| -U |                              B |         100 |
| +U |                              B |         300 |
6 rows in set
| op |                           name |      amount |
| +I |                              A |          10 |
| -U |                              A |          10 |
| +U |                              A |          30 |
| +I |                              B |         100 |
| -U |                              B |         100 |
| +U |                              B |         300 |
6 rows in set
  • 聚合方式为迭代聚合,其中-U为更新前的值,+U为更新后的值
