在 《一条数据在 Apache Iceberg 之旅:写过程分析》 这篇文章中我们分析了 Apache Iceberg 写数据的源码。如下是我们使用 Spark 写两次数据到 Iceberg 表的数据目录布局(测试代码在 这里[1]):
/data/hive/warehouse/default.db/iteblog├── data│ └── ts_year=2020│ ├── id_bucket=0│ │ ├── 00000-0-19603f5a-d38a-4106-aeb9-47285d15a6bd-00001.parquet│ │ ├── 00000-0-491c447b-e05f-40ac-8c32-44d5ef39353b-00001.parquet│ │ ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00002.parquet│ │ └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00002.parquet│ └── id_bucket=1│ ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00001.parquet│ └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00001.parquet└── metadata ├── 00000-0934ba0e-8ed9-48e3-9db2-25dca1f896f2.metadata.json ├── 00001-454ee707-dbc0-4fd8-8c64-0910d8d6315b.metadata.json ├── 00002-7bdd0e6b-ec2b-4b1a-b355-6a81a3f5a0ae.metadata.json ├── 2170dbc3-334e-41ad-ac2c-68dd7470f001-m0.avro ├── 22e1674c-71ea-4d84-8982-276c3370d5c0-m0.avro ├── snap-5353330338887146702-1-2170dbc3-334e-41ad-ac2c-68dd7470f001.avro └── snap-6683043578305788250-1-22e1674c-71ea-4d84-8982-276c3370d5c0.avro5 directories, 13 files
因为我们每次写入的数据就几条,Iceberg 每个分区写文件的时候都是产生新的文件,这就导致底层文件系统里面产生了很多大小才几KB的文件。如果我们是使用 Spark Streaming 的方式7*24小时不断地往 Apache Iceberg 里面写数据,这将产生大量的小文件。
使用 Iceberg 来压缩文件
值得高兴的是,Apache Iceberg 给我们提供了相关 Actions API 来合并这些小文件,具体如下:
Configuration conf = new Configuration();conf.set(METASTOREURIS.varname, "thrift://localhost:9083");Map<String, String> maps = Maps.newHashMap();maps.put("path", "default.iteblog");DataSourceOptions options = new DataSourceOptions(maps);Table table = findTable(options, conf);SparkSession.builder() .master("local[2]") .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, "thrift://localhost:9083") .config("spark.executor.heartbeatInterval", "100000") .config("spark.network.timeoutInterval", "100000") .enableHiveSupport() .getOrCreate();Actions.forTable(table).rewriteDataFiles() .targetSizeInBytes(10 * 1024) // 10KB .execute();
运行完上面代码之后,可以将 Iceberg 的小文件进行合并,得到的新数据目录如下:
⇒ tree /data/hive/warehouse/default.db/iteblog/data/hive/warehouse/default.db/iteblog├── data│ └── ts_year=2020│ ├── id_bucket=0│ │ ├── 00000-0-19603f5a-d38a-4106-aeb9-47285d15a6bd-00001.parquet│ │ ├── 00000-0-491c447b-e05f-40ac-8c32-44d5ef39353b-00001.parquet│ │ ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00002.parquet│ │ ├── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00002.parquet│ │ └── 00001-1-da4e85fa-096d-4b74-9b3c-a260e425385d-00001.parquet│ └── id_bucket=1│ ├── 00000-0-1d5871d5-08ac-4e43-a589-70a5d50dd4d2-00001.parquet│ ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00001.parquet│ └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00001.parquet└── metadata ├── 00000-0934ba0e-8ed9-48e3-9db2-25dca1f896f2.metadata.json ├── 00001-454ee707-dbc0-4fd8-8c64-0910d8d6315b.metadata.json ├── 00002-7bdd0e6b-ec2b-4b1a-b355-6a81a3f5a0ae.metadata.json ├── 00003-d987d15f-2c7c-427c-849e-b8842d77d28e.metadata.json ├── 2170dbc3-334e-41ad-ac2c-68dd7470f001-m0.avro ├── 22e1674c-71ea-4d84-8982-276c3370d5c0-m0.avro ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m0.avro ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m1.avro ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m2.avro ├── snap-3634417817414108593-1-25126b97-5a87-42b7-b45a-499aa41e7359.avro ├── snap-5353330338887146702-1-2170dbc3-334e-41ad-ac2c-68dd7470f001.avro └── snap-6683043578305788250-1-22e1674c-71ea-4d84-8982-276c3370d5c0.avro5 directories, 20 files
对比最新的结果可以得出:
•ts_year=2020/id_bucket=0 新增了名为 00001-1-da4e85fa-096d-4b74-9b3c-a260e425385d-00001.parquet 的数据文件,这个其实就是把之前四个文件进行和合并得到的新文件;•ts_year=2020/id_bucket=1 新增了名为 00000-0-1d5871d5-08ac-4e43-a589-70a5d50dd4d2-00001.parquet 的数据文件,这个其实就是把之前两个文件进行和合并得到的新文件。
Iceberg 小文件合并原理
Iceberg 小文件合并是在 org.apache.iceberg.actions.RewriteDataFilesAction 类里面实现的。小文件合并其实是通过 Spark 并行计算的,这也就是上面 DEMO 初始化了一个 SparkSession 的原因。我们可以通过 RewriteDataFilesAction 类的 targetSizeInBytes 方法来设置输出的合并文件大小。
注意:合并最终的文件并不是都小于或等于 targetSizeInBytes,甚至会出现文件根本没合并的情况。
当我们调用了 execute() 方法,RewriteDataFilesAction 类会先创建出一个 org.apache.iceberg.DataTableScan,然后会把对应表的最新快照(Snapshot)拿出来,紧接着拿出这个快照对应的底层所有数据文件。然后按照分区 Key 进行分组(group),同一个分区的文件放到一起,并将这些信息放到 Map<StructLikeWrapper, Collection> groupedTasks 的结果里面,groupedTasks 的 Key 就是分区信息,如果表不是分区表,那就是空分区;groupedTasks 的 value 就是对应分区底下的文件列表。
由于分区里面可能存在一个文件,这时候就没必要去执行文件合并,这时候可以去掉这部分分区,得到了 Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks
。如果 filteredGroupedTasks 里面没有需要合并的分区那就直接返回了。
如果 filteredGroupedTasks 不为空,则对每个分区里面的文件进行 split 和 combine 操作,如下:
// Split and combine tasks under each partitionList<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream() .map(scanTasks -> { CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles( CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes); return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost); }) .flatMap(Streams::stream) .collect(Collectors.toList());
combinedScanTasks 结构如下:
Apache iceberg write path 如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop combinedScanTasks 里面其实就是封装了 BaseCombinedScanTask 类,这个类里面的 task 就是标识哪些 Iceberg 的数据文件需要合并到新文件里面。得到 combinedScanTasks 之后会构造出一个 RDD:
JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
然后最终会调用 taskRDD 的 map 方法,遍历 combinedScanTasks 里面的 task,将 task 里面对应的 Iceberg 读出来,再写到新文件里面:
public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) { JavaRDD<TaskResult> taskCommitRDD = taskRDD.map(this::rewriteDataForTask); return taskCommitRDD.collect().stream() .flatMap(taskCommit -> Arrays.stream(taskCommit.files())) .collect(Collectors.toList());}
rewriteDataForTask 的实现如下:
private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception { TaskContext context = TaskContext.get(); int partitionId = context.partitionId(); long taskId = context.taskAttemptId(); RowDataReader dataReader = new RowDataReader( task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive); SparkAppenderFactory appenderFactory = new SparkAppenderFactory( properties, schema, SparkSchemaUtil.convert(schema)); OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); BaseWriter writer; if (spec.fields().isEmpty()) { writer = new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE); } else { writer = new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema); } try { while (dataReader.next()) { InternalRow row = dataReader.get(); writer.write(row); } dataReader.close(); dataReader = null; return writer.complete(); } catch (Throwable originalThrowable) { ...... }}
rewriteDataForTasks 执行完会返回新创建文件的路径,最后会写到新的快照里面。在快照里面会将新建的文件表示为 org.apache.iceberg.ManifestEntry.Status#ADDED,上一个快照里面的文件标记为 org.apache.iceberg.ManifestEntry.Status#DELETED。
好了,本文就分享到这里。