1.schema校验其实,通过前面几节的案例会发现,spark sql的dataframe往delta lake写数据的时候schema是不需要用户进行schema校验的。Delta lake会自动校验DataFrame和delta lake 表的schema兼容性。Delta lake使用以下规则来校验Dataframe和 delta 表是否兼容:1).Dataframe列必须在目标delta 表中存在。假设dataframe的列在delta表中没有,就会抛出一个异常。在delta 表中出现的列,但是dataframe中没有的列,那就是delta列中设为null。2).dataframe中的字段类型,与delta表中对应的字段类型强一致。假设不匹配,就会抛出异常。3).DataFrame列名称不能仅仅用大小写区分。这意味着不能在同一表中定义诸如“ Foo”和“ foo”之类的列。虽然spark的dataframe可以有大小写敏感或大小写不敏感(默认)。在存储和返回列信息时,Parquet区分大小写。Delta Lake保留大小写,但在存储schema时不敏感,因此要避免相同名称,大小写不同的列名出现,以避免潜在的错误,数据损坏或丢失问题。Delta Lake支持DDL显式添加新列,并具有自动更新schema的功能。如果指定一些配置,如partitionBy,和append模式组合使用,Delta lake会校验是否匹配,并且在不匹配的时候抛出异常。如果partitionBy没有出现,append模式会自动利用已有数据的分区。2.更新schemadelta lake支持更新表的schema信息。支持如下变更:1).在任意位置增加新的列。2).对已有的列进行排序。可以直接使用ddl做上面两个变更,也支持DML。注意:如果delta lake表的schema被更新了,写入delta表的流处理程序就会终止。所以需要重新启动,假如想要流处理继续工作的话。2.1 显示更新schema直接使用ddl更新delta lake的schema信息。a).增加列
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)默认,支持字段值为null。为嵌套的数据结构增加一个字段,sql表达如下:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)假如一张delta表,增加列之前的schema如下:
- root | - colA | - colB | +-field1 | +-field2为嵌套的字段colB增加一个nested字段,并且位置在field1和field2之间。sql语句如下:
ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)变更之后的schema如下:
- root | - colA | - colB | +-field1 | +-nested | +-field2默认回述出如下日志:
Adding nested columns is supported only for structs. Arrays and maps are not supported.b).变更列的注释或者顺序delta支持对列的描述信息进行变更,也支持改变列的顺序。
ALTER TABLE table_name CHANGE [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]对于嵌套结构的列变更,sql表达如下:
ALTER TABLE table_name CHANGE [COLUMN] col_name.nested_col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]例如,一张表,colB是嵌套格式,schema变更之前如下:
想要交换嵌套结构colB字段field1,field2的顺序,sql表达如下:
ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST执行之后,结果如下:
c).替换schema标准语法如下:
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)假如,执行替换schema sql之前的schema如下:
执行,替换sql语句:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)替换之后的schema如下:
d).更改列的类型及名称可以使用overwriteSchema配置来,在重写一张表的时候改变列的类型,名称或者删除一个列。注意:需要重跑整张表的。改变列的类型表达式如下:
spark.read.table("temp") .withColumn("date", col("date").cast("date")) .write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .saveAsTable("test")改变列名称,表达式如下:
spark.read.table("temp") .withColumnRenamed("date", "date_created") .write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .saveAsTable("schemaTests")2.2 自动更新schema无论是append模式还是overwrite模式,delta lake都支持自动更新表的schema,这样就可以保证schema与写入的数据相互兼容。a).增加列delta 支持dataframe内部有的列,而目标表中不存在的列自动加入到目标表,随着写入事务完成整个操作。需要开启下面的配置:
-
mergeSchema 设置为true。
-
spark.databricks.deltaschema.autoMerge.enabled 设置为true
新列会被自动添加到的schema指定结构的末尾。b).NullType类型列Parquet本身是不支持parquet的,NullType在dataframe写入delta表的时候会自动删除,但是依然是保留schema信息的。当该列位置出现新的类型的时候,delta会自动将新类型与该schema的NullType类型合并,保留新列类型。当delta lake的一个已有列接收到一个NullType类型时,会保留已有列的类型,自动删除新的列。streaming中是不支持NullType的。对于ArrayType和MapType等复杂类型,也不接受NullType。2.3 替换表schema默认情况下,在overwrite一张表的时候并不会overwrite表的schema。想在覆盖数据的同时覆盖schema,必须将overwriteSchema参数设置为true。
df.write.option("overwriteSchema", "true")2.4 表实图Delta Lake支持在Delta表之上创建视图,就像使用数据源表一样。
使用视图进行操作时的核心挑战是解决schema冲突。 如果更改Delta表schema,则必须重新创建派生视图。 例如,如果将新列添加到Delta表中,则必须确保该列在该基表上构建的视图中可用。3. 表属性操作在创建一张表或者对表进行alter操作的时候,可以使用TABLEPROPERTIES来设置你自己的元数据作为表的属性。TABLEPROPERTIES是以delta table元数据的形式存储。如果给定位置中已经存在Delta表,则不能在CREATE语句中定义新的TBLPROPERTIES。此外,为了优化行为和性能,Delta Lake支持某些Delta表特别属性:
-
可以禁止对delta表的删除和更新:delta.appendOnly = true。
-
配置时间旅行保留属性:delta.logRetentionDuration = <间隔字符串>和delta.deletedFileRetentionDuration = <间隔字符串>。
注意:
-
目前只支持delta.前缀的属性。
-
修改delta表属性是一种写操作,它将于其他并发写的操作冲突,从而导致失败。所以,建议在没有并发写的时候区修改表的属性。
还可以使用Spark配置在第一次提交Delta表的过程中设置delta.前缀的属性。例如,要使用属性delta.appendOnly = true初始化Delta表,可将Spark配置spark.databricks.delta.properties.defaults.appendOnly设置为true。sql表达式如下:
spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")
scala表达式如下:
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")