介绍Spark SQL的JSON支持,这是我们在Databricks中开发的一个功能,可以在Spark中更容易查询和创建JSON数据。随着网络和移动应用程序的普及,JSON已经成为Web服务API以及长期存储的常用的交换格式。使用现有的工具,用户通常会使用复杂的管道来在分析系统中读取和写入JSON数据集。在Apache Spark 1.1中发布Spark SQL的JSON支持,在Apache Spark 1.2中增强,极大地简化了使用JSON数据的端到端体验。
现有做法
实际上,用户经常面临使用现代分析系统处理JSON数据的困难。要将数据集写入JSON格式,用户首先需要编写逻辑将其数据转换为JSON。要阅读和查询JSON数据集,通常的做法是使用ETL流水线将JSON记录转换为预定义的结构。在这种情况下,用户必须等待该进程完成才能使用其数据。对于写作和阅读,定义和维护模式定义通常会使ETL任务更加繁重,并消除了半结构化JSON格式的许多优点。如果用户想要使用新的数据,则他们在创建外部表时必须费力定义模式,然后使用自定义的JSON序列化/反序列化库,或者使用JSON UDF的组合来查询数据。
例如,考虑具有以下JSON模式的数据集:
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
{"name":"Michael", "address":{"city":null, "state":"California"}}
在像Hive这样的系统中,JSON对象通常存储为单列的值。要访问此数据,将使用UDF提取和展平JSON对象中的字段。在下面显示的SQL查询中,提取外部字段(名称和地址),然后进一步提取嵌套地址字段。
在下面的示例中,假设上面显示的JSON数据集存储在名为people的表中,JSON对象存储在名为jsonObject的列中:
SELECT
v1.name, v2.city, v2.state
FROM people
LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1
as name, address
LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2
as city, state;
JSON support in Spark SQL
Spark SQL提供了一种用于查询JSON数据的自然语法,以及用于读取和写入数据的JSON模式的自动推断。Spark SQL了解JSON数据中的嵌套字段,并允许用户直接访问这些字段,而无需任何明确的转换。Spark SQL中的上述查询如下所示:
SELECT name, age, address.city, address.state FROM people
在Spark SQL中加载和保存JSON数据集
要查询Spark SQL中的JSON数据集,只需要将Spark SQL指向数据的位置。在没有任何用户规范的情况下,自动进行数据集格式推断。在编程API中,可以通过SQLContext提供的jsonFile和jsonRDD方法来完成。使用这两种方法,您可以为给定的JSON数据集创建一个SchemaRDD,然后可以将SchemaRDD注册为表格。这是一个例子:
// Create a SQLContext (sc is an existing SparkContext)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Suppose that you have a text file called people with the following content:
// {"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
// {"name":"Michael", "address":{"city":null, "state":"California"}}
// Create a SchemaRDD for the JSON dataset.
val people = sqlContext.jsonFile("[the path to file people]")
// Register the created SchemaRDD as a temporary table.
people.registerTempTable("people")
也可以使用纯SQL API创建JSON数据集。例如,对于通过JDBC服务器连接到Spark SQL的用户,他们可以使用:
CREATE TEMPORARY TABLE people
USING org.apache.spark.sql.json
OPTIONS (path '[the path to the JSON dataset]')
在上述示例中,由于未提供数据结构,Spark SQL将通过扫描JSON数据集自动推断模式。当一个字段是JSON对象或数组时,Spark SQL将使用STRUCT类型和ARRAY类型来表示此字段的类型。由于JSON是半结构化的,不同的元素可能具有不同的模式,Spark SQL也将解决字段数据类型的冲突。要了解JSON数据集的架构是什么,用户可以使用编程API中返回的SchemaRDD提供的printSchema()方法或SQL中使用DESCRIBE [table name]来显示模式。例如,通过people.printSchema()可视化的人的模式将是:
root
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- state: string (nullable = true)
|-- name: string (nullable = true)
或者,当使用jsonFile和jsonRDD创建表时,用户可以将模式应用于JSON数据集。在这种情况下,Spark SQL将将提供的模式绑定到JSON数据集,并且不会推断模式。用户不需要知道JSON数据集中出现的所有字段。指定的模式可以是出现在数据集中的字段的子集,也可以是不存在的字段。
创建表示JSON数据集的表后,用户可以轻松地在JSON数据集上编写SQL查询,就像常规表一样。与Spark SQL中的所有查询一样,查询的结果由另一个SchemaRDD表示。例如:
val nameAndAddress = sqlContext.sql("SELECT name, address.city, address.state FROM people")
nameAndAddress.collect.foreach(println)
SQL查询的结果可以由其他数据分析任务直接和立即使用,例如机器学习管道。此外,JSON数据集可以轻松地缓存在Spark SQL内置的内存列存储中,并以其他格式保存,如Parquet或Avro。
将SchemaRDD保存为JSON文件
在Spark SQL中,SchemaRDD可以通过toJSON方法以JSON格式输出。由于SchemaRDD始终包含模式(包括对嵌套和复杂类型的支持),Spark SQL可以自动将数据集转换为JSON,而不需要用户定义的格式。SchemaRDD本身可以从许多类型的数据源创建,包括Apache Hive表,Parquet文件,JDBC,Avro文件,或者是对现有SchemaRDD的查询结果。这种组合意味着无论数据源的来源如何,用户都可以以最小的努力将数据迁移到JSON格式。
What’s next?
处理具有大量字段的JSON数据集
JSON数据通常是半结构化、非固定结构的。将来,我们将扩展Spark SQL对JSON支持,以处理数据集中的每个对象可能具有相当不同的结构的情况。例如,考虑使用JSON字段来保存表示HTTP标头的键/值对的数据集。每个记录可能会引入新的标题类型,并为每个记录使用一个不同的列将产生一个非常宽的模式。我们计划支持自动检测这种情况,而是使用map类型。因此,每行可以包含Map,使得能够查询其键/值对。这样,Spark SQL将处理具有更少结构的JSON数据集,推动了基于SQL的系统可以处理的那种查询的边界。