目录
1. pom.xml依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.14.0</version>
<scope>provided</scope>
</dependency>
<!-- IDEA运行需要 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.14.0</version>
<scope>provided</scope>
</dependency>
<!-- 实现custom format or connector需要 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.14.0</version>
<scope>provided</scope>
</dependency>
2. 使用Table处理有界和*数据流例子
- 处理有界数据流
apiTest\TableBatchDemo.scala
package apiTest
import org.apache.flink.table.api.DataTypes.{ROW, FIELD, BIGINT, STRING, INT}
import org.apache.flink.table.api.{$, EnvironmentSettings, TableEnvironment, row}
import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal, AnyWithOperations}
object TableBatchDemo {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().
useBlinkPlanner().inBatchMode().build()
val tEnv = TableEnvironment.create(settings)
// 定义数据类型
val MyOrder = ROW(FIELD("id", BIGINT()),
FIELD("product", STRING()),
FIELD("amount", INT()))
val table = tEnv.fromValues(MyOrder, row(1L, "BMW", 1),
row(2L, "Tesla", 8),
row(2L, "Tesla", 8),
row(3L, "BYD", 20))
val filtered = table.where($("amount").isGreaterOrEqual(8))
// 调用execute,数据被collect到Job Manager
filtered.execute().print()
}
}
结果如下:
+----------------------+--------------------------------+-------------+
| id | product | amount |
+----------------------+--------------------------------+-------------+
| 2 | Tesla | 8 |
| 2 | Tesla | 8 |
| 3 | BYD | 20 |
+----------------------+--------------------------------+-------------+
3 rows in set
- 处理*数据流
apiTest\TableStreamDemo.scala
package apiTest
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.{$, AnyWithOperations, EnvironmentSettings, ExplainDetail, TableEnvironment, string2Literal}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.common.typeinfo.Types.{ROW, STRING}
object TableStreamDemo {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(senv, bsSettings)
// 此方式定义的tEnv不能使用fromDataStream函数
// val tEnv = TableEnvironment.create(bsSettings)
var dataStream: DataStream[String] =senv.addSource(new WordSourceFunction())
val table = tEnv.fromDataStream(dataStream, $("word"))
val filtered = table.where($("word").like("%t%"))
val explain = filtered.explain(ExplainDetail.JSON_EXECUTION_PLAN)
println(explain)
// 定义隐式值给toAppendStream函数
implicit val row_string_type = ROW(STRING)
tEnv.toAppendStream(filtered)
.print("table")
senv.execute()
}
}
结果如下:
== Abstract Syntax Tree ==
LogicalFilter(condition=[LIKE($0, _UTF-16LE'%t%')])
+- LogicalTableScan(table=[[Unregistered_DataStream_1]])
== Optimized Physical Plan ==
Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])
+- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])
== Optimized Execution Plan ==
Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])
+- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])
== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 2,
"type" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])",
"pact" : "Operator",
"contents" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 3,
"type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
"pact" : "Operator",
"contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
table:6> +I[stream]
table:7> +I[table]
table:8> +I[batch]
table:1> +I[batch]
......省略部分......
3. 使用SQL处理有界和*数据流例子
- 处理有界数据流
apiTest\SqlBatchDemo.scala
package apiTest
import org.apache.flink.table.api.DataTypes.{BIGINT, FIELD, INT, ROW, STRING}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, row}
import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal}
object SqlBatchDemo {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner().inBatchMode().build()
val tEnv = TableEnvironment.create(settings)
val MyOrder = ROW(FIELD("id", BIGINT()),
FIELD("product", STRING()),
FIELD("amount", INT())
)
val input = tEnv.fromValues(MyOrder, row(1L, "BMW", 1),
row(2L, "Tesla", 8),
row(2L, "Tesla", 8),
row(3L, "BYD", 20))
tEnv.createTemporaryView("myOrder",input)
val table = tEnv.sqlQuery("select product, sum(amount) as amount from myOrder group by product")
table.execute().print()
}
}
结果如下:
+--------------------------------+-------------+
| product | amount |
+--------------------------------+-------------+
| Tesla | 16 |
| BYD | 20 |
| BMW | 1 |
+--------------------------------+-------------+
3 rows in set
- 处理*数据流
apiTest\SqlStreamDemo.scala
package apiTest
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.{$, EnvironmentSettings}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.common.typeinfo.Types.{ROW, STRING}
object SqlStreamDemo {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(senv,bsSettings)
val stream:DataStream[String] = senv.addSource(new WordSourceFunction())
val table = tEnv.fromDataStream(stream, $("word"))
val result = tEnv.sqlQuery("select * from " + table + " where word like '%t%'")
implicit val row_string_type = ROW(STRING)
tEnv.toAppendStream(result).print()
println(senv.getExecutionPlan)
senv.execute()
}
}
结果如下:
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 2,
"type" : "SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[word])",
"pact" : "Operator",
"contents" : "SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[word])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 3,
"type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
"pact" : "Operator",
"contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "SinkConversionToRow",
"pact" : "Operator",
"contents" : "SinkConversionToRow",
"parallelism" : 1,
"predecessors" : [ {
"id" : 3,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 5,
"type" : "Sink: Print to Std. Out",
"pact" : "Data Sink",
"contents" : "Sink: Print to Std. Out",
"parallelism" : 8,
"predecessors" : [ {
"id" : 4,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
} ]
}
7> +I[batch]
8> +I[table]
1> +I[stream]
2> +I[stream]
3> +I[table]
......省略部分......
3. Table API和SQL的概念
3.1 TableEnvironment的创建
- Table API和SQL都是基于Table接口,catalog相同
- Table API和SQL都先用Apache Calcite来解析优化等,再用Blink planner进行解析优化等,Stream模式和Batch模式最后都转化成DataStream API的Transformation
- EnvironmentSettings方式
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode() // 用于stream模式
//.inBatchMode() // 用于batch模式
.build()
val tEnv = TableEnvironment.create(settings)
- 不能和DataStream进行交互
- 不支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)
- StreamTableEnvironment方式
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(senv)
- 可以和DataStream进行交互
- 支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)
3.2 Table API和SQL的程序结构
mysql中表user1的结构和数据,和表user2的结构如下:
mysql>
mysql> show create table flink_test.user1;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user1 | CREATE TABLE `user1` (
`id` bigint NOT NULL,
`name` varchar(128) DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
mysql>
mysql> select * from flink_test.user1;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | yi | 1 |
| 2 | er | 2 |
| 3 | san | 3 |
+----+------+------+
3 rows in set (0.00 sec)
mysql>
mysql> show create table flink_test.user2;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user2 | CREATE TABLE `user2` (
`id` bigint NOT NULL,
`name` varchar(128) DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.01 sec)
mysql>
TableSqlTest.scala
package TableApiTest
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object TableSqlTest {
def main(args: Array[String]): Unit = {
// 定义Table环境
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
val table_str =
"""
|create temporary table %s(
| id bigint,
| name string,
| age int,
| primary key (id) not enforced
|) with (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://192.168.23.33:3306/flink_test',
| 'driver' = 'com.mysql.cj.jdbc.Driver',
| 'table-name' = '%s',
| 'username' = 'root',
| 'password' = 'Root_123'
|)
|""".stripMargin
// 在catalog注册表
tEnv.executeSql(table_str.format("user1", "user1"))
tEnv.executeSql(table_str.format("user2", "user2"))
// =====================读取源表数据=====================
// val user1 = tEnv.from("user1") // 方式一
// val user1 = tEnv.sqlQuery("select * from user1 limit 2") // 方式二
// =====================向目标表插入数据=====================
// user1.executeInsert("user2") // 方式一
val stmtSet = tEnv.createStatementSet()
// stmtSet.addInsert("user2", user1) // 方式二
stmtSet.addInsertSql("insert into user2 select * from user1 limit 2") // 方式三
stmtSet.execute()
}
}
- Batch模式只能插入BatchTableSink
- Streaming模式可以插入AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink
执行TableSqlTest.scala,查询mysql中的表user2数据
mysql>
mysql> select * from user2;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | yi | 1 |
| 2 | er | 2 |
+----+------+------+
2 rows in set (0.00 sec)
mysql>
3.3 Table API和SQL的简单使用
3.3.1 聚合查询
代码示例:
package TableApiTest
import org.apache.flink.table.api.Expressions.{$, lit, row}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object TableSqlTest {
def main(args: Array[String]): Unit = {
// 定义Table环境
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
val table = tEnv.fromValues(
row(10, "A"),
row(20, "A"),
row(100, "B"),
row(200, "B")
).renameColumns($("f0").as("amount"), $("f1").as("name"))
tEnv.createTemporaryView("tmp_table", table)
val table_result = table
.filter($("amount").isGreater(lit(0)))
.groupBy($("name"))
.select($("name"), $("amount").sum().as("amount"))
table_result.execute().print()
val sql_result = tEnv.sqlQuery("select name, sum(amount) as amount from tmp_table where amount > 0 group by name")
sql_result.execute().print()
}
}
执行结果:
+----+--------------------------------+-------------+
| op | name | amount |
+----+--------------------------------+-------------+
| +I | A | 10 |
| -U | A | 10 |
| +U | A | 30 |
| +I | B | 100 |
| -U | B | 100 |
| +U | B | 300 |
+----+--------------------------------+-------------+
6 rows in set
+----+--------------------------------+-------------+
| op | name | amount |
+----+--------------------------------+-------------+
| +I | A | 10 |
| -U | A | 10 |
| +U | A | 30 |
| +I | B | 100 |
| -U | B | 100 |
| +U | B | 300 |
+----+--------------------------------+-------------+
6 rows in set
- 聚合方式为迭代聚合,其中-U为更新前的值,+U为更新后的值