文章目录
- 一. create table hints
- 1. 语法
- 2. 示例
- 3. 注意
- 二. 实战:简化hive连接器参数设置
- 三. select hints(ing)
SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。本章介绍如何使用 SQL 提示来实现各种干预。
SQL 提示一般可以用于以下:
- 增强 planner:没有完美的 planner, SQL 提示让用户更好地控制执行;
- 增加元数据(或者统计信息):如"已扫描的表索引"和"一些混洗键(shuffle keys)的倾斜信息"的一些统计数据对于查询来说是动态的,用提示来配置它们会非常方便,因为我们从 planner
获得的计划元数据通常不那么准确;- 算子(Operator)资源约束:在许多情况下,我们会为执行算子提供默认的资源配置,即最小并行度或托管内存(UDF 资源消耗)或特殊资源需求(GPU 或 SSD 磁盘)等,可以使用 SQL 提示非常灵活地
为每个查询(非作业)配置资源
。
一. create table hints
动态表选项允许动态地指定或覆盖表选项
,不同于用 SQL DDL 或 连接 API 定义的静态表选项,这些选项可以在每个查询的每个表范围内灵活地指定。
因此,它非常适合用于交互式终端中的特定查询,例如,在 SQL-CLI 中,你可以通过添加动态选项/*+ OPTIONS('csv.ignore-parse-errors'='true') */
来指定忽略 CSV 源的解析错误。
1. 语法
为了不破坏 SQL 兼容性,我们使用 Oracle 风格的 SQL hints 语法:
table_path /*+ OPTIONS(key=val [, key=val]*) */
key: string字符
val: string字符
2. 示例
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
-- `覆盖`查询语句中源表的选项
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
-- 覆盖 join 中源表的选项
select * from
kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1
join
kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2
on t1.id = t2.id;
-- 覆盖插入语句中结果表的选项
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;
3. 注意
create table
hints 传递的连接器中catalog
的相关参数,即create table with
下参数,具体到源代码是:context.getCatalogTable().getOptions()
。
如果传参无效且在日志中看到参数已经设置成功,那
可能将context.getConfiguration()中的参数传递到with参数下,比如:
hive连接器下:table.exec.hive.sink.statistic-auto-gather.enable
参数由DefaultDynamicTableContext的configuration
来接收。此参数为flink sql的全局参数,此时可以通过set table.exec.hive.sink.statistic-auto-gather.enable=false
语法来设定参数。
二. 实战:简化hive连接器参数设置
对于hive连接器,Flink实现了通过catalog的方式来管理hive表,在使用hive表时需要使用hive相关语法,此时需要声明,hive dialect,如下:
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'aaa',
'hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);
SET table.sql-dialect=hive;
-- 因为需要使用hive连接器中的写特性,所以需要create table ,此时sql语法为hive语法
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
-- 对于某些框架例如chunjun,此处不能很好的适配:
--
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列声明 watermark。
) WITH (...);
-- streaming sql, insert into hive table
INSERT INTO TABLE myhive.aaa.hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;
如下可以把写hive的一些行为通过sql hint方式,放到Flink sql语句中,如下整个Flink sql 会清爽很多。
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'database_name',
'hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);
CREATE TABLE source_kafka (
`pv` string,
`uv` string,
`p_day_id` string
) WITH (
'connector' = 'kafka-x'
,'topic' = 'hive_kafka'
,'properties.bootstrap.servers' = 'xxx:9092'
,'properties.group.id' = 'luna_g'
,'scan.startup.mode' = 'earliest-offset'
,'json.timestamp-format.standard' = 'SQL'
,'json.ignore-parse-errors' = 'true'
,'format' = 'json'
,'scan.parallelism' = '1'
);
insert into
myhive.database_name.table_name /*+ OPTIONS('partition.time-extractor.timestamp-pattern'='$p_day_id:00:00','sink.partition-commit.policy.kind'='metastore,success-file','sink.partition-commit.success-file.name'='_SUCCESS_gao111') */
select * from source_kafka;
三. select hints(ing)
查询提示(Query Hints
)用于为优化器修改执行计划提供建议,该修改只能在当前查询提示所在的查询块中生效(Query block
, 什么是查询块)。 目前,Flink 查询提示只支持联接提示(Join Hints
)。
具体见:官网
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/sql/queries/hints/#%E6%9F%A5%E8%AF%A2%E6%8F%90%E7%A4%BA