一、Flink1.13安装
1、官网下载链接 https://flink.apache.org/zh/downloads.html#apache-flink-1131
2、拷贝压缩包到服务器里解压 tar -xvf flink-1.13.1-bin-scala_2.12.tgz
3、进入flink的conf目录,对flink-conf.xml进行配置(本次测试以单机standalone部署)
rest.address = 你的IP
rest.port = 你的端口
jobmanager.rpc.address = 你的IP
二、依赖项安装
要与 Hive 集成,您需要在 Flink 下的/lib/目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。
Apache Hive 是基于 Hadoop 之上构建的, 首先您需要 Hadoop 的依赖
- 执行export HADOOP_CLASSPATH=
hadoop classpath
把hadoop-mapreduce-client-core-3.0.0.jar包防到flink的lib目录下(版本试你的hadoop集群而定)
- 下载对应版本的 hive-connector
地址:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/hive/overview/
下载后把flink-sql-connector-hive-2.2.0_2.11-1.13.0.jar放到flink的lib目录下
三、代码编写
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner() // 使用BlinkPlanner
.inBatchMode() // Batch模式,默认为StreamingMode
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive"; // Catalog名称,定义一个唯一的名称表示
String defaultDatabase = params.get("defaultDatabase"); // 默认数据库名称
String hiveConfDir = params.get("hiveConf"); // hive-site.xml路径
String version = "2.1.1"; // Hive版本号
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
TableResult result;
String SelectTables_sql ="select * from test.testdata";
result = tableEnv.executeSql(SelectTables_sql);
result.print();
}
注意:hiveConf为hadoop集群上hive-site.xml存在的目录
四、启动集群,提交任务
1、进入flink的目录下执行 bin/start-cluster.sh
2、登录rest.address : rest.port查看网页是否正常运行
3、执行命令提交任务
flink/flink-1.13.1/bin/flink run -c org.example.FlinkHiveIntegration flink/job/flinkcommonjob-1.1.jar -hiveConf /etc/hive/conf.cloudera.hive/ -defaultDatabase test
五、报错处理
Caused by: java.lang.NoClassDefFoundError: Lorg/apache/hadoop/mapred/JobConf;
解决方法:补充hadoop-mapreduce-client-core-3.0.0.jar包