1、Flink基本操作
1.1、配置参数和jar包
Flink1.11开始就不在提供flink-shaded-hadoop-2-uber的支持,所以如果需要flink支持hadoop得配置环境变量HADOOP_CLASSPATH
[root@hadoop1 flink-1.11.0]# vim bin/config.sh
export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3
export HADOOP_HDFS_HOME=/opt/module/hadoop-3.1.3
export HADOOP_YARN_HOME=/opt/module/hadoop-3.1.3
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3
export HADOOP_CLASSPATH=`hadoop classpath`
export PATH=$PATH:$HADOOP_CLASSPATH
目前Iceberg只支持flink1.11.x的版本,所以我这使用flink1.11.0,将构建好的Iceberg的jar包复制到flink下
[root@hadoop1 libs]# cd /opt/module/iceberg-apache-iceberg-0.11.1/flink-runtime/build/libs/
[root@hadoop1 libs]# cp *.jar /opt/module/flink-1.11.0/lib/
1.2、Flink SQL Client
1、启动flink集群,并启动flink sql client
bin/sql-client.sh embedded shell
2、使用 Catalogs 创建目录
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://mycluster/flink/warehouse/',
'property-version'='1'
);
或者修改 sql-client-defaults.yaml,添加以下内容
[root@hadoop103 conf]# vim sql-client-defaults.yaml catalogs:
- name: hadoop_catalog
type: iceberg
catalog-type: hadoop
warehouse: hdfs://mycluster/flink/warehouse/
3、使用当前 catalog
use catalog hadoop_catalog;
4、建库建表
建库可以直接使用create database;建表需要指定分区,使用flink对接iceberg不能使用iceberg的隐藏分区。
5、写入与修改数据
flink默认使用流的方式插入数据,这个时候流的插入是不支持overwrite操作的。需要将插入模式进行修改SET execution.type = batch;,改成批的插入方式,再次使用overwrite插入数据。如需要改回流式操作参数设置为 SET execution.type = streaming;会根据分区进行覆盖操作。
2、Flink API操作
1、需要引入相关依赖包
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>0.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
2.1、读操作
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testA");
batchRead(env, tableLoader);
streamingRead(env, tableLoader);
env.execute();
}
// 通过batch的方式去读取数据
public static void batchRead(StreamExecutionEnvironment env, TableLoader tableLoader) {
DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
batch.map(item -> item.getInt(0) + "\t" + item.getString(1) + "\t" + item.getInt(2) + "\t" + item.getString(3)).print();
}
// 通过streaming的方式去读取数据,启动之后程序不会立马停止
public static void streamingRead(StreamExecutionEnvironment env, TableLoader tableLoader)
{
DataStream<RowData> stream = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).build();
stream.print();
}
2.2、 写操作
// 采用的是batch批处理
public static void appendingData(StreamExecutionEnvironment env, TableLoader tableLoader) {
DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
TableLoader tableB = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB");
FlinkSink.forRowData(batch).tableLoader(tableB).build();
}
// 根据分区将数据进行覆盖操作
public static void overwriteData(StreamExecutionEnvironment env, TableLoader tableLoader) {
DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
TableLoader tableB = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB");
FlinkSink.forRowData(batch).tableLoader(tableB).overwrite(true).build();
}
3、读写Flink存在的问题
- Flink 不支持 Iceberg 隐藏分区
- 不支持通过计算列根据case class创建表
- 不支持创建带水位线的表
- 不支持添加列、删除列、重命名列
- Flink写iceberg需要使用checkpoint