Iceberg(三)对接Flink

1、Flink基本操作

1.1、配置参数和jar包

        Flink1.11开始就不在提供flink-shaded-hadoop-2-uber的支持,所以如果需要flink支持hadoop得配置环境变量HADOOP_CLASSPATH

[root@hadoop1 flink-1.11.0]# vim bin/config.sh 
export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_HDFS_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_YARN_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_CLASSPATH=`hadoop classpath`
export PATH=$PATH:$HADOOP_CLASSPATH

        目前Iceberg只支持flink1.11.x的版本,所以我这使用flink1.11.0,将构建好的Iceberg的jar包复制到flink下

[root@hadoop1 libs]# cd  /opt/module/iceberg-apache-iceberg-0.11.1/flink-runtime/build/libs/
[root@hadoop1 libs]# cp *.jar /opt/module/flink-1.11.0/lib/

1.2、Flink SQL Client

        1、启动flink集群,并启动flink sql client

bin/sql-client.sh embedded shell

         2、使用 Catalogs 创建目录

CREATE CATALOG hadoop_catalog WITH ( 
    'type'='iceberg',
    'catalog-type'='hadoop', 
    'warehouse'='hdfs://mycluster/flink/warehouse/', 
    'property-version'='1'
);

         或者修改 sql-client-defaults.yaml,添加以下内容

[root@hadoop103 conf]# vim sql-client-defaults.yaml catalogs:
- name: hadoop_catalog
  type: iceberg 
  catalog-type: hadoop
  warehouse: hdfs://mycluster/flink/warehouse/

        3、使用当前 catalog

use catalog hadoop_catalog;

        4、建库建表

        建库可以直接使用create database;建表需要指定分区,使用flink对接iceberg不能使用iceberg的隐藏分区。

        5、写入与修改数据

        flink默认使用流的方式插入数据,这个时候流的插入是不支持overwrite操作的。需要将插入模式进行修改SET execution.type = batch;,改成批的插入方式,再次使用overwrite插入数据。如需要改回流式操作参数设置为 SET execution.type = streaming;会根据分区进行覆盖操作。

2、Flink API操作

        1、需要引入相关依赖包

<dependency>
	<groupId>org.apache.iceberg</groupId>
	<artifactId>iceberg-flink-runtime</artifactId>
	<version>0.11.1</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-client</artifactId>
	<version>${hadoop.version}</version>
</dependency>

2.1、读操作

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testA");
    batchRead(env, tableLoader);
    streamingRead(env, tableLoader);
    env.execute();
}

// 通过batch的方式去读取数据
public static void batchRead(StreamExecutionEnvironment env, TableLoader tableLoader) {
    DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
    batch.map(item -> item.getInt(0) + "\t" + item.getString(1) + "\t" + item.getInt(2) + "\t" + item.getString(3)).print();
}

// 通过streaming的方式去读取数据,启动之后程序不会立马停止
public static void streamingRead(StreamExecutionEnvironment env, TableLoader tableLoader)
{
    DataStream<RowData>	stream	= FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).build();
    stream.print();
}

2.2、 写操作

// 采用的是batch批处理
public static void appendingData(StreamExecutionEnvironment env, TableLoader tableLoader) {
    DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
    TableLoader tableB = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB");
    FlinkSink.forRowData(batch).tableLoader(tableB).build();
}

// 根据分区将数据进行覆盖操作
public static void overwriteData(StreamExecutionEnvironment env, TableLoader tableLoader) {
    DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
    TableLoader tableB = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB");
    FlinkSink.forRowData(batch).tableLoader(tableB).overwrite(true).build();
}

3、读写Flink存在的问题

  1. Flink 不支持 Iceberg 隐藏分区
  2. 不支持通过计算列根据case class创建表
  3. 不支持创建带水位线的表
  4. 不支持添加列、删除列、重命名列
  5. Flink写iceberg需要使用checkpoint

   

上一篇:Flink任务调度原理


下一篇:flink的伪分布式搭建