FlinkSQL写入hive

配置1:vim flink-conf.yml
流式写入hive需要配置检查点
# state.backend: filesystem
state.backend: filesystem
# 取消的时候保存检查点
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# 60s 一次检查点
execution.checkpointing.interval: 60s
# 检查点语意
execution.checkpointing.mode: EXACTLY_ONCE


# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.checkpoints.dir: file:///tmp/flink12-checkpoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
state.savepoints.dir: file:///tmp/flink12-savepoints

配置2,使用FlinkSQL-client需要配置
vim  sql-client-defaults.yaml

catalogs:  #[] # empty list
# A typical catalog definition looks like:
  - name: uathive
    type: hive
    hive-conf-dir: /etc/hive/conf
    default-database: temp

 

写sql作业

set execution.type=streaming;
--使用hive方言
SET table.sql-dialect=hive; 
--创建一张hive分区表,按天,时分区
drop table if exists ods_hive_t_area;
CREATE TABLE ods_hive_t_area (
`id` int COMMENT '代号',
`name` string COMMENT '姓名',
`area_id` int COMMENT '区域',
`money` int COMMENT '销售额'
) PARTITIONED BY (dt STRING,hr string,mi string) STORED AS parquet  TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='1 min',
  --'sink.partition-commit.policy.kind'='metastore,success-file'
  'sink.partition-commit.policy.kind'='success-file'
);
drop table if exists ods_source_2hive_kafka_t_area;
create table ods_source_2hive_kafka_t_area(
`before` row(id int,name string,area_id int ,money int),
`after` row(id int,name string,area_id int ,money int),
op string
) with(
  'connector' = 'kafka',
  'topic' = 'ods_t_area1',
  'properties.bootstrap.servers' = '10.16.74.34:9092',
  'properties.group.id' = 'ods_t_area1_group2hive',
  --value值可为 latest-offset | earliest-offset
  'scan.startup.mode' = 'earliest-offset',
  --此处的key用的format,默认是对josn中value的数据进行定义,此时='value.format', 当json中的数据有类型错误时,该字段会给null值。
  'format' = 'json',
  --如果给true, 则错误格式可以忽略,给null值,如果给false,则会导致读取数据错误,读取中断, 仅限于json数据使用此选项
  'json.ignore-parse-errors'='true'
  );
INSERT INTO ods_hive_t_area  
select 
case when op='d' and after is null then before.id else after.id end ,
case when op='d' and after is null then null else after.name end ,
case when op='d' and after is null then null else after.area_id end ,
case when op='d' and after is null then null else after.money end,
cast(minute(localtimestamp) as string)  FROM ods_source_2hive_kafka_t_area;

遇到的问题:

[hive@m764 lib]$ hadoop fs -ls -R /user/hive/warehouse/temp.db/ods_hive_t_area/
drwxrwxr-x   - hive hive          0 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26
-rw-r--r--   1 hive hive          0 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26/_SUCCESS
-rw-r--r--   1 hive hive       1156 2021-05-06 17:26 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26/part-f35d61fa-6a8d-4a51-a59f-83c597c6c42c-0-0
drwxrwxr-x   - hive hive          0 2021-05-06 17:29 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27
-rw-r--r--   1 hive hive          0 2021-05-06 17:29 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27/_SUCCESS
-rw-r--r--   1 hive hive        541 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27/part-f35d61fa-6a8d-4a51-a59f-83c597c6c42c-0-1

显示成功写入hive,有_seccess文件,但是select 不到数据

解决:刷新一下元数据

msck repair table  ods_hive_t_area;

然后可以查到hive中的数据了

上一篇:Flink通过Catalog连接hive,使用FlinkSQL进行读写


下一篇:DWS 层-关键词主题表(FlinkSQL)