【flink】问题整理

一、关于TableException的问题

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Sink `catalog_1`.`mydb`.`region_sales_sink` does not exists

这类问题要建一个表region_sales_sink,这是flink写到mysql

CREATE TABLE region_sales_sink (
region_name VARCHAR(30),
buy_cnt BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://192.168.9.103:3306/mydw?useUnicode=true&characterEncoding=utf-8',
'connector.table' = 'top_region', -- MySQL中的待插入数据的表
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = '000000',
'connector.write.flush.interval' = '1s'
);

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException:in the classpath not find

这类问题需要导入包:(我使用的flink1.11,自带flink-json1.11.0,flink-connnector-kafka1.11.0)

flink-connector-hive_2.11-1.11.0.jar

flink-connector-jdbc_2.12-1.11.3.jar

flink-table-planner_2.11-1.11.3.jar

flink-table-planner-blink_2.11-1.11.3.jar

hive-exec-3.1.2.jar

mysql-connector-java-8.0.15.jar

有了这些包分发到flink集群,flink就可以和mysql,hive,kafka互通

 

type类型错误

flink/lib下的jar包冲突,删除冲突jar

query[source] 

query[sink]字段不匹配

添加的字段类型不同,desc tablename,查看表字段类型

 

 

 

二、配置flink的元数据到hive,不然每次重启flink-cli,之前建的表都丢失了

 在这个默认的sql-client-defaults.yaml修改

catalogs: 
  - name: catalog_1
  type: hive
  hive-conf-dir: /opt/module/hive/conf
  default-database: mydb

execution:

  type: streaming

  result-mode: tableau  #查询出来的数据表格化,更好看

三、flink-cli启动flink-sql>建库,建表

建事实行为表:行为数据存储在kafka

CREATE TABLE user_behavior (
user_id BIGINT, -- 用户id
item_id BIGINT, -- 商品id
cat_id BIGINT, -- 品类id
action STRING, -- 用户行为
province INT, -- 用户所在的省份
ts BIGINT, -- 用户行为发生的时间戳
proctime as PROCTIME(), -- 通过计算列产生一个处理时间列
eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
WATERMARK FOR eventTime as eventTime - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka主题
'connector.startup-mode' = 'earliest-offset', -- 偏移量,从起始 offset 开始读取
'connector.properties.group.id' = 'group1', -- 消费者组
'connector.properties.zookeeper.connect' = 'hadoop101:2181', -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'hadoop101:9092', -- kafka broker 地址
'format.type' = 'json' -- 数据源格式为 json
);

kafka相关操作补充:

kafka-topics.sh --zookeeper 192.168.9.101:2181 --list

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user_behavior

kafka-topics.sh --zookeeper 192.168.9.101:2181 --delete --topic flinksql2

--一个是kafka端口
kafka-console-producer.sh --broker-list 192.168.9.101:9092 --topic user_behavior
--一个是zookeeper端口
kafka-console-consumer.sh --zookeeper 192.168.9.101:2181 --topic student --from-beginning

{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":1,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":2,"ts":1573420486}
{"user_id":91640,"item_id":2817,"cat_id":611,"action":"fav","province":3,"ts":1573420486}
{"user_id":91640,"item_id":2817,"cat_id":611,"action":"buy","province":3,"ts":1573420486}

建维度表:维度数据存储在mysql

CREATE TABLE dim_province (
province_id BIGINT,
province_name VARCHAR,
region_name VARCHAR
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://192.168.9.103:3306/mydw?useUnicode=true&characterEncoding=utf-8',
'connector.table' = 'dim_province',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = '000000',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);

 

mysql库mydw建表

CREATE TABLE dim_province (
province_id BIGINT,
province_name VARCHAR,
region_name VARCHAR) ;

并插入数据:

insert into dim_province (province_id,province_name,region_name) values (1,'广东','华南');
insert into dim_province (province_id,province_name,region_name) values (2,'湖北','华中');
insert into dim_province (province_id,province_name,region_name) values (3,'山东','华北');

 

上一篇:sql基本操作


下一篇:pip install mysql_python报错解决办法