Flink Table和SQL的TableEnvironment的创建、程序结构、简单使用示例

目录

1. pom.xml依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.14.0</version>
    <scope>provided</scope>
</dependency>

<!-- IDEA运行需要 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.14.0</version>
    <scope>provided</scope>
</dependency>

<!-- 实现custom format or connector需要 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.14.0</version>
    <scope>provided</scope>
</dependency>

2. 使用Table处理有界和*数据流例子

  1. 处理有界数据流

apiTest\TableBatchDemo.scala

package apiTest

import org.apache.flink.table.api.DataTypes.{ROW, FIELD, BIGINT, STRING, INT}
import org.apache.flink.table.api.{$, EnvironmentSettings, TableEnvironment, row}
import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal, AnyWithOperations}



object TableBatchDemo {

  def main(args: Array[String]): Unit = {

    val settings = EnvironmentSettings.newInstance().
      useBlinkPlanner().inBatchMode().build()

    val tEnv = TableEnvironment.create(settings)

    // 定义数据类型
    val MyOrder = ROW(FIELD("id", BIGINT()),
      FIELD("product", STRING()),
      FIELD("amount", INT()))

    val table = tEnv.fromValues(MyOrder, row(1L, "BMW", 1),
      row(2L, "Tesla", 8),
      row(2L, "Tesla", 8),
      row(3L, "BYD", 20))

    val filtered = table.where($("amount").isGreaterOrEqual(8))

    // 调用execute,数据被collect到Job Manager
    filtered.execute().print()

  }

}

结果如下:

+----------------------+--------------------------------+-------------+
|                   id |                        product |      amount |
+----------------------+--------------------------------+-------------+
|                    2 |                          Tesla |           8 |
|                    2 |                          Tesla |           8 |
|                    3 |                            BYD |          20 |
+----------------------+--------------------------------+-------------+
3 rows in set
  1. 处理*数据流

apiTest\TableStreamDemo.scala

package apiTest

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.{$, AnyWithOperations, EnvironmentSettings, ExplainDetail, TableEnvironment, string2Literal}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.common.typeinfo.Types.{ROW, STRING}



object TableStreamDemo {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner().inStreamingMode().build()

    val tEnv = StreamTableEnvironment.create(senv, bsSettings)
    // 此方式定义的tEnv不能使用fromDataStream函数
    // val tEnv = TableEnvironment.create(bsSettings)

    var dataStream: DataStream[String] =senv.addSource(new WordSourceFunction())
    val table = tEnv.fromDataStream(dataStream, $("word"))

    val filtered = table.where($("word").like("%t%"))
    val explain = filtered.explain(ExplainDetail.JSON_EXECUTION_PLAN)
    println(explain)

    // 定义隐式值给toAppendStream函数
    implicit val row_string_type = ROW(STRING)
    tEnv.toAppendStream(filtered)
      .print("table")

    senv.execute()


  }

}

结果如下:

== Abstract Syntax Tree ==
LogicalFilter(condition=[LIKE($0, _UTF-16LE'%t%')])
+- LogicalTableScan(table=[[Unregistered_DataStream_1]])

== Optimized Physical Plan ==
Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])
+- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])

== Optimized Execution Plan ==
Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])
+- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])

== Physical Execution Plan ==
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])",
    "pact" : "Operator",
    "contents" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
    "pact" : "Operator",
    "contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
table:6> +I[stream]
table:7> +I[table]
table:8> +I[batch]
table:1> +I[batch]
......省略部分......

3. 使用SQL处理有界和*数据流例子

  1. 处理有界数据流

apiTest\SqlBatchDemo.scala

package apiTest

import org.apache.flink.table.api.DataTypes.{BIGINT, FIELD, INT, ROW, STRING}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, row}
import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal}


object SqlBatchDemo {

  def main(args: Array[String]): Unit = {

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner().inBatchMode().build()
    val tEnv = TableEnvironment.create(settings)

    val MyOrder = ROW(FIELD("id", BIGINT()),
      FIELD("product", STRING()),
      FIELD("amount", INT())
    )
    val input = tEnv.fromValues(MyOrder, row(1L, "BMW", 1),
      row(2L, "Tesla", 8),
      row(2L, "Tesla", 8),
      row(3L, "BYD", 20))

    tEnv.createTemporaryView("myOrder",input)
    val table = tEnv.sqlQuery("select product, sum(amount) as amount from myOrder group by product")
    table.execute().print()

  }

}

结果如下:

+--------------------------------+-------------+
|                        product |      amount |
+--------------------------------+-------------+
|                          Tesla |          16 |
|                            BYD |          20 |
|                            BMW |           1 |
+--------------------------------+-------------+
3 rows in set
  1. 处理*数据流

apiTest\SqlStreamDemo.scala

package apiTest

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.{$, EnvironmentSettings}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.common.typeinfo.Types.{ROW, STRING}


object SqlStreamDemo {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner().inStreamingMode().build()
    val tEnv = StreamTableEnvironment.create(senv,bsSettings)

    val stream:DataStream[String] = senv.addSource(new WordSourceFunction())
    val table = tEnv.fromDataStream(stream, $("word"))

    val result = tEnv.sqlQuery("select * from " + table + " where word like '%t%'")
    implicit val row_string_type = ROW(STRING)
    tEnv.toAppendStream(result).print()

    println(senv.getExecutionPlan)
    senv.execute()

  }

}

结果如下:

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[word])",
    "pact" : "Operator",
    "contents" : "SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[word])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
    "pact" : "Operator",
    "contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "SinkConversionToRow",
    "pact" : "Operator",
    "contents" : "SinkConversionToRow",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "Sink: Print to Std. Out",
    "pact" : "Data Sink",
    "contents" : "Sink: Print to Std. Out",
    "parallelism" : 8,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "REBALANCE",
      "side" : "second"
    } ]
  } ]
}
7> +I[batch]
8> +I[table]
1> +I[stream]
2> +I[stream]
3> +I[table]
......省略部分......

3. Table API和SQL的概念

3.1 TableEnvironment的创建

  • Table API和SQL都是基于Table接口,catalog相同
  • Table API和SQL都先用Apache Calcite来解析优化等,再用Blink planner进行解析优化等,Stream模式和Batch模式最后都转化成DataStream API的Transformation
  1. EnvironmentSettings方式
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings = EnvironmentSettings
  .newInstance()
  .inStreamingMode()    // 用于stream模式
  //.inBatchMode()      // 用于batch模式
  .build()

val tEnv = TableEnvironment.create(settings)
  • 不能和DataStream进行交互
  • 不支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)
  1. StreamTableEnvironment方式
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val senv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(senv)
  • 可以和DataStream进行交互
  • 支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)

3.2 Table API和SQL的程序结构

mysql中表user1的结构和数据,和表user2的结构如下:

mysql> 
mysql> show create table flink_test.user1;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                         |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user1 | CREATE TABLE `user1` (
  `id` bigint NOT NULL,
  `name` varchar(128) DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

mysql> 
mysql> select * from flink_test.user1;
+----+------+------+
| id | name | age  |
+----+------+------+
|  1 | yi   |    1 |
|  2 | er   |    2 |
|  3 | san  |    3 |
+----+------+------+
3 rows in set (0.00 sec)

mysql> 
mysql> show create table flink_test.user2;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                         |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user2 | CREATE TABLE `user2` (
  `id` bigint NOT NULL,
  `name` varchar(128) DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.01 sec)

mysql>

TableSqlTest.scala

package TableApiTest

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object TableSqlTest {

  def main(args: Array[String]): Unit = {


    // 定义Table环境
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    val table_str =
      """
        |create temporary table %s(
        |  id bigint,
        |  name string,
        |  age int,
        |  primary key (id) not enforced
        |) with (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://192.168.23.33:3306/flink_test',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = '%s',
        |   'username' = 'root',
        |   'password' = 'Root_123'
        |)
        |""".stripMargin
    // 在catalog注册表
    tEnv.executeSql(table_str.format("user1", "user1"))
    tEnv.executeSql(table_str.format("user2", "user2"))

    // =====================读取源表数据=====================
    // val user1 = tEnv.from("user1")   // 方式一
    // val user1 = tEnv.sqlQuery("select * from user1 limit 2")   // 方式二


    // =====================向目标表插入数据=====================
    // user1.executeInsert("user2")     // 方式一

    val stmtSet = tEnv.createStatementSet()
    // stmtSet.addInsert("user2", user1)    // 方式二
    stmtSet.addInsertSql("insert into user2 select * from user1 limit 2")   // 方式三
    stmtSet.execute()
    
    
  }

}
  • Batch模式只能插入BatchTableSink
  • Streaming模式可以插入AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink

执行TableSqlTest.scala,查询mysql中的表user2数据

mysql> 
mysql> select * from user2;
+----+------+------+
| id | name | age  |
+----+------+------+
|  1 | yi   |    1 |
|  2 | er   |    2 |
+----+------+------+
2 rows in set (0.00 sec)

mysql> 

3.3 Table API和SQL的简单使用

3.3.1 聚合查询

代码示例:

package TableApiTest

import org.apache.flink.table.api.Expressions.{$, lit, row}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object TableSqlTest {

  def main(args: Array[String]): Unit = {
    
    // 定义Table环境
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    val table = tEnv.fromValues(
      row(10, "A"),
      row(20, "A"),
      row(100, "B"),
      row(200, "B")
    ).renameColumns($("f0").as("amount"), $("f1").as("name"))
    tEnv.createTemporaryView("tmp_table", table)

    val table_result = table
      .filter($("amount").isGreater(lit(0)))
      .groupBy($("name"))
      .select($("name"), $("amount").sum().as("amount"))
    
    table_result.execute().print()

    val sql_result = tEnv.sqlQuery("select name, sum(amount) as amount from tmp_table where amount > 0 group by name")
    sql_result.execute().print()
    
  }

}

执行结果:

+----+--------------------------------+-------------+
| op |                           name |      amount |
+----+--------------------------------+-------------+
| +I |                              A |          10 |
| -U |                              A |          10 |
| +U |                              A |          30 |
| +I |                              B |         100 |
| -U |                              B |         100 |
| +U |                              B |         300 |
+----+--------------------------------+-------------+
6 rows in set
+----+--------------------------------+-------------+
| op |                           name |      amount |
+----+--------------------------------+-------------+
| +I |                              A |          10 |
| -U |                              A |          10 |
| +U |                              A |          30 |
| +I |                              B |         100 |
| -U |                              B |         100 |
| +U |                              B |         300 |
+----+--------------------------------+-------------+
6 rows in set
  • 聚合方式为迭代聚合,其中-U为更新前的值,+U为更新后的值
上一篇:【flink】flink作业超额启动多个taskManager k8s


下一篇:SQL 关于apply的两种形式cross apply 和 outer apply, with cube 、with rollup 和 grouping