本文基于 Apache Iceberg 0.9.0 最新分支,主要分析 Apache Iceberg 中使用 Spark 2.4.6 来写数据到 Iceberg 表中,也就是对应 iceberg-spark2 模块。当然,Apache Iceberg 也支持 Flink 来读写 Iceberg 表,其底层逻辑也 Spark 类似,感兴趣的同学可以去看看。
使用 Spark2 将数据写到 Apache Iceberg
在介绍下面文章之前,我们先来看下在 Apache Spark 2.4.6 中写数据到 Iceberg 的例子。这个版本的 Spark,Apache Iceberg 不支持使用 SQL 的形式来创建 Iceberg 表,所以只能使用 Iceberg 的 Java API 来进行,具体如下:
TableIdentifier name = TableIdentifier.of("default", "iteblog");Catalog catalog = new HiveCatalog(spark.sparkContext().hadoopConfiguration());Schema schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get()), Types.NestedField.required(3, "age", Types.IntegerType.get()), Types.NestedField.optional(4, "ts", Types.TimestampType.withZone()));PartitionSpec spec = PartitionSpec.builderFor(schema).year("ts").bucket("id", 2).build();// 创建 Iceberg 表Table table = catalog.createTable(name, schema, spec);List<Person> list = Lists.newArrayList( new Person(1, "iteblog", 100, Timestamp.valueOf("2020-11-01 00:00:00")), new Person(2, "iteblog2", 300, Timestamp.valueOf("2020-11-01 00:00:00")), new Person(3, "iteblog", 100, Timestamp.valueOf("2020-12-02 00:00:00")), new Person(4, "iteblog", 100, Timestamp.valueOf("2020-11-02 00:00:00")));JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());JavaRDD<Row> rowRDD = javaSparkContext.parallelize(list) .map((Function<Person, Row>) record -> RowFactory.create(record.getId(), record.getName(), record.getAge(), record.getTs()));StructType structType = new StructType() .add("id", "int", false) .add("name", "string", true) .add("age", "int", false) .add("ts", "timestamp", true);Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, structType);//写数据到 Iceberg 表dataFrame.write().format("iceberg").mode("append").save("default.iteblog");
由于我们使用的是 HiveCatalog 来管理 Iceberg 表的元数据,所以上面代码会把表的元信息写到 Hive 的 MetaStore 中,同时也会在 hive.metastore.warehouse.dir
参数配置的路径下保存 Iceberg 的 meta 信息以及真正的数据,如下:
⇒ tree /data/hive/warehouse/iteblog/data/hive/warehouse/iteblog├── data│ └── ts_year=2020│ ├── id_bucket=0│ │ ├── 00000-0-108487fa-5418-409d-b1fb-9708523d4741-00001.parquet│ │ └── 00001-1-b51f4d71-53ff-4fb1-b7a5-f1b6a2ef8521-00002.parquet│ └── id_bucket=1│ └── 00001-1-b51f4d71-53ff-4fb1-b7a5-f1b6a2ef8521-00001.parquet└── metadata ├── 00000-e99dd798-afe8-4eaa-ab51-4ddf84c16832.metadata.json ├── 00001-474ed57f-8afc-4162-9e0d-37dce447fda6.metadata.json ├── 69b5d09e-725b-48f8-b778-e7a737b6abb2-m0.avro └── snap-8752398031282699586-1-69b5d09e-725b-48f8-b778-e7a737b6abb2.avro5 directories, 7 files
其中 metadata 目录下就是 Iceberg 表的元数据,其中以 metadata.json 结尾的文件里面保存了 Iceberg 表的表结构、分区信息、manifest list 路径以及快照相关的信息;data 目录下包含的就是表的真实数据。其他文件的目的和用途我将在 Apache Iceberg 读过程分析再介绍。
Apache Iceberg 数据写过程分析
Apache Iceberg 支持使用 Spark 计算引擎来写数据到对应表中,支持 Spark 2 和 Spark 3。本文介绍的是使用 Spark 2 来写数据的流程。这部分的代码在 Iceberg 代码的 iceberg-spark2 模块中。下面我们来详细介绍 Apache Iceberg 使用 Apache Spark 2 的写路径。主要分 Spark 的 Driver 端的执行和 Executor 端的执行。
Apache Iceberg 写数据在 Spark Driver 端的处理
在 iceberg-spark2 模块中,写数据到 Iceberg 使用了 Spark 的 DataSourceV2 相关读写 API,读写的入口类为 org.apache.iceberg.spark.source.IcebergSource,从代码实现可以看出,Iceberg 支持以批流的形式来写数据,为了简单起见,本文只介绍批处理的形式来写数据。
写数据之前需要先实现 org.apache.spark.sql.sources.v2.WriteSupport 接口,并创建一个 DataSourceWriter,具体如下:
public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct, SaveMode mode, DataSourceOptions options) { Preconditions.checkArgument(mode == SaveMode.Append || mode == SaveMode.Overwrite, "Save mode %s is not supported", mode); Configuration conf = new Configuration(lazyBaseConf()); // 从 HiveCatalog 或者 HadoopTables 里面获取 Iceberg 表 Table table = getTableAndResolveHadoopConfiguration(options, conf); // 将 Spark 的 StructType 转换成 Iceberg 内部的 Schema Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsStruct); // 这里主要是做一些校验 TypeUtil.validateWriteSchema(table.schema(), writeSchema, checkNullability(options), checkOrdering(options)); // 校验分区的 Transform 是否合法 SparkUtil.validatePartitionTransforms(table.spec()); String appId = lazySparkSession().sparkContext().applicationId(); String wapId = lazySparkSession().conf().get("spark.wap.id", null); boolean replacePartitions = mode == SaveMode.Overwrite; // 将 Iceberg 表的信息广播到 Executor Broadcast<FileIO> io = lazySparkContext().broadcast(SparkUtil.serializableFileIO(table)); // Iceberg 支持对写的数据进行加密 Broadcast<EncryptionManager> encryptionManager = lazySparkContext().broadcast(table.encryption()); return Optional.of(new Writer( table, io, encryptionManager, options, replacePartitions, appId, wapId, writeSchema, dsStruct));}
从上面可以看出,Iceberg 只支持 Append 或 Overwrite 两种形式的 SaveMode,其他类型的将会抛异常。在创建 Writer 之前,需要先获取表的信息,然后校验表的模式是否和写的数据模式一样,包括数据是否可以为空以及数据的字段是否和表的顺序一致,不满足则抛出异常。紧接着会校验分区的 Transform 是否合法,在 Iceberg 中,支持隐藏分区(hidden partitioning)的功能,分区的信息并没有写到元数据里面,而只是保存在 Iceberg 的 metadata 里面。Iceberg 支持的 Transform 包括 truncate、bucket、identity、year、month 以及 day 等,其他类型的分区 Transform 是不合法的。
由于数据写入工作是在 Executor 端进行的,所以需要将 Iceberg 表的信息广播到 Executor 端。同时,Iceberg 也支持对写入的数据进行加密,这个功能在 Delta Lake 以及 Apache Hudi 是没有提供的,当前 Iceberg 写入的数据默认是没有加密的,而且好像没看到有加密的类实现,所以如果要使用这个功能是需要我们实现的。
最后就是创建一个 Writer 实例,其实现 org.apache.spark.sql.sources.v2.writer.DataSourceWriter 接口,内部实现了 createWriterFactory 、commit 以及 abort 方法,具体如下:
class Writer implements DataSourceWriter { ...... @Override public DataWriterFactory<InternalRow> createWriterFactory() { return new WriterFactory( table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize, writeSchema, dsSchema); } @Override public void commit(WriterCommitMessage[] messages) { if (replacePartitions) { replacePartitions(messages); } else { append(messages); } } @Override public void abort(WriterCommitMessage[] messages) { Map<String, String> props = table.properties(); Tasks.foreach(files(messages)) .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) .exponentialBackoff( PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .throwFailureWhenFinished() .run(file -> { io.value().deleteFile(file.path().toString()); }); }....}
上面 Writer 类我只列出了实现 DataSourceWriter 类里面相关的方法。在 DataSourceV2,写数据的第二步就是实现 createWriterFactory 方法来创建具体的 Writer,这个 Factory 里面会创建出具体写数据的类,我们后面介绍。
commit 方法主要是在 Executor 端全部的 Task 都成功写完数据,并执行完 Task 的 commit 操作进行的,Writer 里面的 commit 方法是作业级别的。这个函数接收的参数是 messages,其存储了本次写数据产生的文件(由 org.apache.iceberg.GenericDataFile 类实现)路径,各种详细的统计信息(记录条数、文件大小、最大最小值以及空值总数等),这样我们就知道产生了哪些新文件。
commit 会生成快照文件(snap- 开头的文件)以及 manifest-list 文件,如下:
⇒ ll /data/hive/warehouse/iteblog/metadata/total 56-rw-r--r-- 1 iteblog wheel 1.4K 11 9 19:14 00000-3fc4c48e-32b8-4e93-b42d-d318b2b9009d.metadata.json-rw-r--r-- 1 iteblog wheel 1.4K 11 9 19:21 00000-a0558729-67a4-4f91-a19f-cd8b42a52589.metadata.json-rw-r--r-- 1 iteblog wheel 1.4K 11 9 17:28 00000-e99dd798-afe8-4eaa-ab51-4ddf84c16832.metadata.json-rw-r--r-- 1 iteblog wheel 2.2K 11 9 19:24 00001-912e676a-fa08-4650-abaf-05e8df1a0c8d.metadata.json-rw-r--r-- 1 iteblog wheel 5.1K 11 9 19:21 00dfb9b4-1f13-4fd1-abe0-4cbd4c5ec876-m0.avro-rw-r--r-- 1 iteblog wheel 3.0K 11 9 19:21 snap-5890630393462353077-1-00dfb9b4-1f13-4fd1-abe0-4cbd4c5ec876.avro
snap-5890630393462353077-1-00dfb9b4-1f13-4fd1-abe0-4cbd4c5ec876.avro 文件是这张 Iceberg 表的最新快照相关信息,内容如下:
{"manifest_path":"file:/data/hive/warehouse/iteblog/metadata/00dfb9b4-1f13-4fd1-abe0-4cbd4c5ec876-m0.avro","manifest_length":5262,"partition_spec_id":0,"added_snapshot_id":{"long":5890630393462353077},"added_data_files_count":{"int":3},"existing_data_files_count":{"int":0},"deleted_data_files_count":{"int":0},"partitions":{"array":[{"contains_null":false,"lower_bound":{"bytes":"2\u0000\u0000\u0000"},"upper_bound":{"bytes":"2\u0000\u0000\u0000"}},{"contains_null":false,"lower_bound":{"bytes":"\u0000\u0000\u0000\u0000"},"upper_bound":{"bytes":"\u0001\u0000\u0000\u0000"}}]},"added_rows_count":{"long":4},"existing_rows_count":{"long":0},"deleted_rows_count":{"long":0}}
其中存储了 manifest list 文件的路径,以及其他一些统计信息。现在我们来看下 00dfb9b4-1f13-4fd1-abe0-4cbd4c5ec876-m0.avro 这个 manifest 文件的内容:
{"status":1,"snapshot_id":{"long":5890630393462353077},"data_file":{"file_path":"file:/data/hive/warehouse/iteblog/data/ts_year=2020/id_bucket=0/00000-0-b9ec5fd7-8784-489d-99df-fdb160d0e1b1-00001.parquet","file_format":"PARQUET","partition":{"ts_year":{"int":50},"id_bucket":{"int":0}},"record_count":2,"file_size_in_bytes":1188,"block_size_in_bytes":67108864,"column_sizes":{"array":[{"key":1,"value":49},{"key":2,"value":62},{"key":3,"value":51},{"key":4,"value":98}]},"value_counts":{"array":[{"key":1,"value":2},{"key":2,"value":2},{"key":3,"value":2},{"key":4,"value":2}]},"null_value_counts":{"array":[{"key":1,"value":0},{"key":2,"value":0},{"key":3,"value":0},{"key":4,"value":0}]},"lower_bounds":{"array":[{"key":1,"value":"\u0001\u0000\u0000\u0000"},{"key":2,"value":"iteblog"},{"key":3,"value":"d\u0000\u0000\u0000"},{"key":4,"value":"\u0000öù²\u0005\u0000"}]},"upper_bounds":{"array":[{"key":1,"value":"\u0002\u0000\u0000\u0000"},{"key":2,"value":"iteblog2"},{"key":3,"value":",\u0001\u0000\u0000"},{"key":4,"value":"\u0000öù²\u0005\u0000"}]},"key_metadata":null,"split_offsets":{"array":[4]}}}{"status":1,"snapshot_id":{"long":5890630393462353077},"data_file":{"file_path":"file:/data/hive/warehouse/iteblog/data/ts_year=2020/id_bucket=1/00001-1-606ce3a6-ca71-4179-be95-b08fc5c65734-00001.parquet","file_format":"PARQUET","partition":{"ts_year":{"int":50},"id_bucket":{"int":1}},"record_count":1,"file_size_in_bytes":1145,"block_size_in_bytes":67108864,"column_sizes":{"array":[{"key":1,"value":47},{"key":2,"value":58},{"key":3,"value":47},{"key":4,"value":57}]},"value_counts":{"array":[{"key":1,"value":1},{"key":2,"value":1},{"key":3,"value":1},{"key":4,"value":1}]},"null_value_counts":{"array":[{"key":1,"value":0},{"key":2,"value":0},{"key":3,"value":0},{"key":4,"value":0}]},"lower_bounds":{"array":[{"key":1,"value":"\u0003\u0000\u0000\u0000"},{"key":2,"value":"iteblog"},{"key":3,"value":"d\u0000\u0000\u0000"},{"key":4,"value":"\u0000 \u000B8iµ\u0005\u0000"}]},"upper_bounds":{"array":[{"key":1,"value":"\u0003\u0000\u0000\u0000"},{"key":2,"value":"iteblog"},{"key":3,"value":"d\u0000\u0000\u0000"},{"key":4,"value":"\u0000 \u000B8iµ\u0005\u0000"}]},"key_metadata":null,"split_offsets":{"array":[4]}}}{"status":1,"snapshot_id":{"long":5890630393462353077},"data_file":{"file_path":"file:/data/hive/warehouse/iteblog/data/ts_year=2020/id_bucket=0/00001-1-606ce3a6-ca71-4179-be95-b08fc5c65734-00002.parquet","file_format":"PARQUET","partition":{"ts_year":{"int":50},"id_bucket":{"int":0}},"record_count":1,"file_size_in_bytes":1145,"block_size_in_bytes":67108864,"column_sizes":{"array":[{"key":1,"value":47},{"key":2,"value":58},{"key":3,"value":47},{"key":4,"value":57}]},"value_counts":{"array":[{"key":1,"value":1},{"key":2,"value":1},{"key":3,"value":1},{"key":4,"value":1}]},"null_value_counts":{"array":[{"key":1,"value":0},{"key":2,"value":0},{"key":3,"value":0},{"key":4,"value":0}]},"lower_bounds":{"array":[{"key":1,"value":"\u0004\u0000\u0000\u0000"},{"key":2,"value":"iteblog"},{"key":3,"value":"d\u0000\u0000\u0000"},{"key":4,"value":"\u0000à͸\r³\u0005\u0000"}]},"upper_bounds":{"array":[{"key":1,"value":"\u0004\u0000\u0000\u0000"},{"key":2,"value":"iteblog"},{"key":3,"value":"d\u0000\u0000\u0000"},{"key":4,"value":"\u0000à͸\r³\u0005\u0000"}]},"key_metadata":null,"split_offsets":{"array":[4]}}}
这个文件就三行内容,分别对应本次写操作产生的文件路径,以及这个文件里面的一些统计信息。
最后再生成 xxx.metadata.json 文件,记录了本次写操作的信息,包括表的 Schema、分区 Schema、本次快照的 ID、本次和历史快照文件的路径等信息。有了这些元数据,在读的时候利用这些元数据可以知道哪些文件是最新快照的东西,这个我们在后面再介绍。
当然,写数据的时候也可能失败,这时候就得执行 abort 操作。abort 操作很简单,就是删除由 Task 产生的数据文件。
好了,到这里,上面所有的步骤都是在 Spark 的 Driver 端执行的。
Apache Iceberg 写数据在 Spark Executor 端的处理
现在我们来看下在 Spark 的 Executor 端执行的东西,也就是从 WriterFactory 开始。下面是 WriterFactory 代码的主要实现,为了分析的方便,我删除了一些无关的代码,处理后的代码如下:
static class WriterFactory implements DataWriterFactory<InternalRow> { @Override public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) { OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema); if (spec.fields().isEmpty()) { return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); } else { return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema, dsSchema); } }}
Iceberg 的 WriterFactory 主要实现 org.apache.spark.sql.sources.v2.writer.DataWriterFactory 类,其中主要任务就是实现 createDataWriter 函数。通过 Spark 写数据到 Iceberg 表里面最终是通过一个个 Task 实现真正的写逻辑的,每个 Task 在写数据之前会调用这个方法来创建 DataWriter,所以 createDataWriter 代码是在 Spark 的 Executor 端执行的。由上面代码可以看出,根据用户是否指定分区信息(PartitionSpec)来分别初始化 Unpartitioned24Writer 或 Partitioned24Writer。这两个类的实现如下:
private static class Unpartitioned24Writer extends UnpartitionedWriter<InternalRow> implements DataWriter<InternalRow> { Unpartitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize); } @Override public WriterCommitMessage commit() throws IOException { close(); return new TaskCommit(complete()); }}private static class Partitioned24Writer extends SparkPartitionedWriter implements DataWriter<InternalRow> { Partitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, Schema schema, StructType sparkSchema) { super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, schema, sparkSchema); } @Override public WriterCommitMessage commit() throws IOException { close(); return new TaskCommit(complete()); }}
Unpartitioned24Writer 和 Partitioned24Writer 都是实现 org.apache.spark.sql.sources.v2.writer.DataWriter 接口,并都实现了 DataWriter 接口的 commit 方法,这个方法的目的就是把对应的 Task 生成的新文件返回到 Driver 端。在 Driver 端,这些新生成的文件会传递到 DataSourceWriter#commit 函数,也就是上面介绍的 Writer 类的 commit 函数接收到。
Unpartitioned24Writer 和 Partitioned24Writer 再根据是否分区实现其特有的功能。为了让大家看清楚这个继承关系,我把这两个类的继承关系图画出来了,如下所示。
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop
从上图可以看出,Unpartitioned24Writer 和 Partitioned24Writer 继承/实现的类很多都一样,区别在于 Partitioned24Writer 需要处理一些分区 Key 生成的逻辑,这部分实现是在 SparkPartitionedWriter 里面实现的。仔细的同学肯定发现了,上面图中分别使用了四种颜色来区分不同类所在的包。这里解释一下:
•灰色部分:也就是 org.apache.iceberg.io 这个包,这里面的类或接口都是在 iceberg-core 模块中,里面的 PartitionedWriter 和 UnpartitionedWriter 在 Flink、Spark3 的代码里面都有使用,这里面定义了 Iceberg 写数据的公共逻辑;•绿色部分:也就是 org.apache.spark.sql.sources.v2.writer 这个包,其是 Spark 的代码。所有实现 Spark DataSourceV2 的写逻辑都必须实现 DataWriter 里面的 write、commit 以及 abort 三个方法;•粉色部分:也就是 org.apache.iceberg.spark.source 这个包,这部分代码是在 iceberg-spark 模块里面,其里面定义了 Spark 2 和 Spark 3 的公共逻辑, iceberg-spark2 和 iceberg-spark3 都依赖了它。其中的 SparkPartitionedWriter 类实现了 Iceberg 的分区 Key 在 Spark 中的实现。•白色部分:也就是 org.apache.iceberg.spark.source.Writer 这个,注意,org.apache.iceberg.spark.source.Writer 里面的 Writer 是一个类,上面已经介绍过,Unpartitioned24Writer 和 Partitioned24Writer 是定义在 org.apache.iceberg.spark.source.Writer 类里面的。
为了介绍清楚每个 Task 写 Iceberg 数据的流程,我把 Spark 中调用 Unpartitioned24Writer 或 Partitioned24Writer 的逻辑贴出来了,如下(已经删了不需要的部分):
object DataWritingSparkTask extends Logging { def run( // 这个 writeTask 就是 org.apache.iceberg.spark.source.Writer.WriterFactory writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow], useCommitCoordinator: Boolean): WriterCommitMessage = { // 调用 org.apache.iceberg.spark.source.Writer.WriterFactory 的 createDataWriter 方法 // 获得 Unpartitioned24Writer 或 Partitioned24Writer val dataWriter = writeTask.createDataWriter(partId, taskId, epochId.toLong) // 下面就是写数据到 Iceberg 的 Utils.tryWithSafeFinallyAndFailureCallbacks(block = { // iter 就是我们要写的数据,因为每个 Task 要写入的数据可能有很多条,所以这里需要循环 while (iter.hasNext) { // 调用 Unpartitioned24Writer 或 Partitioned24Writer 的 write 方法来写数据 dataWriter.write(iter.next()) } val msg = if (useCommitCoordinator) { if (commitAuthorized) { // 如果上面 Task 循环写数据都成功了, // 说明可以执行 Unpartitioned24Writer 或 Partitioned24Writer 的 commit 方法。 // Unpartitioned24Writer 或 Partitioned24Writer 的 commit 方法 上面已经列出来了, // 其实就是把本次 Task 新创建的文件路径以及统计信息封装到 org.apache.iceberg.GenericDataFile 里面, // 这些文件最终会返回到 Spark 的 Driver 端的 dataWriter.commit() } else { throw new CommitDeniedException(message, stageId, partId, attemptId) } } else { dataWriter.commit() } msg })(catchBlock = { // 如果上面 Task 写数据出现异常,那么需要调用 Unpartitioned24Writer 或 Partitioned24Writer 的 abort 方法, // 其实就是删除刚刚新增的文件 dataWriter.abort() }) }}
关于 Spark 的 Task 写 Iceberg 的大致流程上面代码已经注释的很清楚了。下面我们来看下 Unpartitioned24Writer 和 Partitioned24Writer 的 write 方法是如何实现的。
Iceberg 未分区的表在 Spark 中的实现
public class UnpartitionedWriter<T> extends BaseTaskWriter<T> { private final RollingFileWriter currentWriter; public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); currentWriter = new RollingFileWriter(null); } @Override public void write(T record) throws IOException { currentWriter.add(record); } @Override public void close() throws IOException { currentWriter.close(); }}
上面其实是 Iceberg 未分区表的实现逻辑,这部分代码是在 iceberg-core 模块中,也就是说 Flink、Spark2 以及 Spark 3 都是调用这个的。由于 Unpartitioned24Writer 是继承 UnpartitionedWriter 类的,在初始化 Unpartitioned24Writer 的时候同时也会初始化 UnpartitionedWriter,也就是会创建出一个 名为 currentWriter 的 RollingFileWriter 实例。RollingFileWriter 类看名字就知道是一个可以滚动切割文件的 Writer。
然后大家可以看到里面有个 write 函数的实现,上面我们已经贴出 Spark 的 DataWritingSparkTask 类,其中提到了这个类里面会循环写数据(也就是 dataWriter.write(iter.next())),其实调用的就是这个函数,其实现是通过 currentWriter.add(record) 来做的。
close 方法其实就是关闭当前正在写的文件,并将关闭的文件路径和统计信息封装到 GenericDataFile 里面,并加到新增文件列表里面,这些文件列表最后会返回到 Driver 端。
Iceberg 分区的表在 Spark 中的实现
Iceberg 分区写数据稍微复杂一些,得先定义出 partitionKey,这部分逻辑是在 SparkPartitionedWriter 实现的,如下(已经删除了部分代码):
public class SparkPartitionedWriter extends PartitionedWriter<InternalRow> { private final PartitionKey partitionKey; @Override protected PartitionKey partition(InternalRow row) { partitionKey.partition(internalRowWrapper.wrap(row)); return partitionKey; }}
定义好 partitionKey 之后,我们来看下分区表的数据如何写的:
public abstract class PartitionedWriter<T> extends BaseTaskWriter<T> { private final Set<PartitionKey> completedPartitions = Sets.newHashSet(); private PartitionKey currentKey = null; private RollingFileWriter currentWriter = null; ...... protected abstract PartitionKey partition(T row); @Override public void write(T row) throws IOException { PartitionKey key = partition(row); if (!key.equals(currentKey)) { if (currentKey != null) { // if the key is null, there was no previous current key and current writer. currentWriter.close(); completedPartitions.add(currentKey); } ...... currentKey = key.copy(); currentWriter = new RollingFileWriter(currentKey); } currentWriter.add(row); } @Override public void close() throws IOException { if (currentWriter != null) { currentWriter.close(); } }}
可以看到,分区的 write 方法稍微复杂一些。得先判断写入的这条数据分区值是否和上一条写入数据的分区值是否一样;
•如果一样,则直接调用 currentWriter.add(row); 把这条数据和上一条数据写到同一个分区文件里面。•如果不一样,需要关闭上一个分区文件,然后根据当前的分区值创建一个新的 RollingFileWriter 对象。最后把这条数据写到新的分区文件里面。
分区文件的 close 逻辑和 未分区一样,这里不再介绍了。
RollingFileWriter 的实现
大家已经看到了,Unpartitioned24Writer 和 Partitioned24Writer 其实都是调用 RollingFileWriter 来真正写数据的,这个也是写 Iceberg 数据的核心所在。Unpartitioned24Writer 和 Partitioned24Writer 都会初始化 RollingFileWriter,我们来看下 RollingFileWriter 的初始化所做的事情:
public RollingFileWriter(PartitionKey partitionKey) { this.partitionKey = partitionKey; openCurrent();}private void openCurrent() { if (partitionKey == null) { // unpartitioned currentFile = fileFactory.newOutputFile(); } else { // partitioned currentFile = fileFactory.newOutputFile(partitionKey); } currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format); currentRows = 0;}
如果是分区表的写入,则 partitionKey 是不等于 null 的,那么上面的 currentFile 路径是带分区路径的,比如本文开头的路径为 /data/hive/warehouse/iteblog/data/ts_year=2020/id_bucket=0/xxxx.parquet。如果不是分区表,那么写入的文件路径大概为 /data/hive/warehouse/iteblog/data/xxxx.parquet。这样就确定了数据需要写到哪个文件里面。
接下来是要决定怎么写了,这就是 currentAppender,appenderFactory 其实是 FileAppenderFactory 的实例,其主要有 org.apache.iceberg.flink.sink.RowDataTaskWriterFactory.FlinkFileAppenderFactory、org.apache.iceberg.spark.source.SparkAppenderFactory 以及 org.apache.iceberg.data.GenericAppenderFactory 三个实现。由于我们这里是介绍 Spark 的写入,所以我们来看下 SparkAppenderFactory。关键代码如下:
class SparkAppenderFactory implements FileAppenderFactory<InternalRow> { .... @Override public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { switch (fileFormat) { case PARQUET: return Parquet.write(file) .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType)) .setAll(properties) .metricsConfig(metricsConfig) .schema(writeSchema) .overwrite() .build(); case AVRO: return Avro.write(file) .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema)) .setAll(properties) .schema(writeSchema) .overwrite() .build(); case ORC: return ORC.write(file) .createWriterFunc(SparkOrcWriter::new) .setAll(properties) .metricsConfig(metricsConfig) .schema(writeSchema) .overwrite() .build(); default: throw new UnsupportedOperationException("Cannot write unknown format: " + fileFormat); } } catch (IOException e) { throw new RuntimeIOException(e); } }}
可以看到,是根据 fileFormat 来决定创建哪个 FileAppender,这个 fileFormat 在 Spark 可以通过 write-format 或者表属性 write.format.default 来决定,默认是写 parquet 文件。所以这里调用的是初始化出一个 ParquetWriteAdapter,这个类就是底层写 Parquet 数据的封装,其实现是包装了 org.apache.parquet.hadoop.ParquetWriter 类的。我们前面介绍的写一行数据底层其实调用的是 ParquetWriter 的 write 方法。
好了,到这里使用 Spark 写 Iceberg 表的流程已经介绍的差不多了。当然,由于篇幅有限,其实还有很多逻辑没介绍清楚的,大家有时间可以自己去看下代码。
Spark 写 Iceberg 表时序图
下面我们来总结一下 Spark 写 Iceberg 表的过程。为了表述清楚,我这里画了两幅时序图,图中黄色部分是 Spark 的代码,绿色部分是 Iceberg 的代码,大家可以参考。可以到 https://www.iteblog.com/archives/9888.html [1] 获取原图。
总结
上面我们已经分析了 Iceberg 在 Spark 的写路径。值得注意的是,当前最新版本的 Iceberg 还只支持 overwrite 或者 append 的写模式,DELETE/UPDATE/MERGE INTO 这三种当前还不支持,不过 Iceberg 社区正在添加这个功能,可以参见 Updates/Deletes/Upserts in Iceberg[2]、Update the Iceberg spec for row-level deletes[3]。好了,关于 Iceberg 的写路径就分享到这里,后面有时间我会再写一篇读路径的文章,欢迎关注。