pyflink实时接收kafka数据至mysql

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings
from pyflink.table.catalog import HiveCatalog
from pyflink.table import SqlDialect

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=env_settings)

catalog = HiveCatalog("myhive", "ods", "/home/hadoop/hive-3.1.2/conf")

# Register the catalog
t_env.register_catalog("myhive", catalog)
# set the HiveCatalog as the current catalog of the sessionT_env.use_catalog("myhive")
t_env.use_catalog("myhive")
t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
# Create a catalog table
t_env.execute_sql("""CREATE TABLE IF NOT EXISTS sink_parent_info(
 etl_date STRING
,id                       BIGINT
,user_id                  BIGINT
,height                   DECIMAL(5,2)
,weight                   DECIMAL(5,2)
)
""")

# should return the tables in current catalog and database.
t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(f"""
CREATE TEMPORARY TABLE source_parent_info(
 id                       bigint
,user_id                  bigint
,height                   decimal(5,2)
,weight                   decimal(5,2)
) with (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://xxxx:3306/xxxx',
'connector.driver'= 'com.mysql.cj.jdbc.Driver',
'connector.table' = 'parent_info',
'connector.username' = 'root',
'connector.password' = 'xxxx',
'connector.write.flush.interval' = '1s')
""")

t_env.execute_sql("""
INSERT INTO sink_parent_info
SELECT
id
,user_id
,height
,weight

FROM source_parent_info
""").wait()

参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830

上一篇:ambari集群添加hive服务遇到的问题


下一篇:关于Mac宿主机无法ping通Docker容器的问题