Flink通过Catalog连接hive,使用FlinkSQL进行读写

一、Flink1.13安装

1、官网下载链接 https://flink.apache.org/zh/downloads.html#apache-flink-1131

2、拷贝压缩包到服务器里解压 tar -xvf flink-1.13.1-bin-scala_2.12.tgz

3、进入flink的conf目录,对flink-conf.xml进行配置(本次测试以单机standalone部署)

rest.address = 你的IP

rest.port = 你的端口

jobmanager.rpc.address = 你的IP


二、依赖项安装

要与 Hive 集成,您需要在 Flink 下的/lib/目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。

Apache Hive 是基于 Hadoop 之上构建的, 首先您需要 Hadoop 的依赖

  • 执行export HADOOP_CLASSPATH=hadoop classpath

把hadoop-mapreduce-client-core-3.0.0.jar包防到flink的lib目录下(版本试你的hadoop集群而定)

  • 下载对应版本的 hive-connector

地址:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/hive/overview/

下载后把flink-sql-connector-hive-2.2.0_2.11-1.13.0.jar放到flink的lib目录下


三、代码编写

public static void main(String[] args) throws Exception {

    ParameterTool params = ParameterTool.fromArgs(args);
    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .useBlinkPlanner() // 使用BlinkPlanner
            .inBatchMode() // Batch模式,默认为StreamingMode
            .build();

    TableEnvironment tableEnv = TableEnvironment.create(settings);
    String name = "myhive";                                                             // Catalog名称,定义一个唯一的名称表示
    String defaultDatabase = params.get("defaultDatabase");                             // 默认数据库名称
    String hiveConfDir = params.get("hiveConf");                                        // hive-site.xml路径
    String version = "2.1.1";                                                           // Hive版本号
    HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
    tableEnv.registerCatalog("myhive", hive);
    tableEnv.useCatalog("myhive");

    TableResult result;
    String SelectTables_sql ="select * from test.testdata";
    result = tableEnv.executeSql(SelectTables_sql);
    result.print();
}

注意:hiveConf为hadoop集群上hive-site.xml存在的目录


四、启动集群,提交任务

1、进入flink的目录下执行 bin/start-cluster.sh

2、登录rest.address : rest.port查看网页是否正常运行

3、执行命令提交任务

flink/flink-1.13.1/bin/flink run -c org.example.FlinkHiveIntegration flink/job/flinkcommonjob-1.1.jar -hiveConf /etc/hive/conf.cloudera.hive/ -defaultDatabase test


五、报错处理

Caused by: java.lang.NoClassDefFoundError: Lorg/apache/hadoop/mapred/JobConf;

解决方法:补充hadoop-mapreduce-client-core-3.0.0.jar包

上一篇:FlinkSQL演进过程,解析原理及一些优化策略


下一篇:FlinkSQL写入hive