from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings
from pyflink.table.catalog import HiveCatalog
from pyflink.table import SqlDialect
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=env_settings)
catalog = HiveCatalog("myhive", "ods", "/home/hadoop/hive-3.1.2/conf")
# Register the catalog
t_env.register_catalog("myhive", catalog)
# set the HiveCatalog as the current catalog of the sessionT_env.use_catalog("myhive")
t_env.use_catalog("myhive")
t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
# Create a catalog table
t_env.execute_sql("""CREATE TABLE IF NOT EXISTS sink_parent_info(
etl_date STRING
,id BIGINT
,user_id BIGINT
,height DECIMAL(5,2)
,weight DECIMAL(5,2)
)
""")
# should return the tables in current catalog and database.
t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(f"""
CREATE TEMPORARY TABLE source_parent_info(
id bigint
,user_id bigint
,height decimal(5,2)
,weight decimal(5,2)
) with (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://xxxx:3306/xxxx',
'connector.driver'= 'com.mysql.cj.jdbc.Driver',
'connector.table' = 'parent_info',
'connector.username' = 'root',
'connector.password' = 'xxxx',
'connector.write.flush.interval' = '1s')
""")
t_env.execute_sql("""
INSERT INTO sink_parent_info
SELECT
id
,user_id
,height
,weight
FROM source_parent_info
""").wait()
参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830