【译】Apache Spark 数据建模之时间维度(一)

编译:诚历,阿里巴巴计算平台事业部 EMR 技术专家,Apache Sentry PMC,Apache Commons Committer,目前从事开源大数据存储和优化方面的工作。

原文链接http://blog.madhukaraphatak.com/data-modeling-spark-part-1/


数据建模是数据分析重要的组成之一,正确的建立模型有助于用户更好地解答业务相关的问题。在过去几十年中,数据建模技术也一直是SQL数据仓库的基础。

Apache Spark作为新一代的数仓技术的代表,我们能够在 Spark 中使用早期的数据建模技术。这使得Spark data pineline 更加有效。

在本系列文章中,我将讨论spark中不同的数据建模。本系列的第一篇文章中将讨论如何使用日期维度。

数据分析中数据和时间的重要性

我们分析的大多数数据通常都包含日期或时间戳。例如,它可能是

  • 股票的交易日期
  • POS系统的交易时间

我们所做的很多分析通常都是关于日期或时间的。我们通常希望使用相同的方法对数据进行切分。

使用内置的Spark进行数据分析

本节讨论如何使用内置的spark日期函数进行数据分析。

苹果股票数据

在本例中,我们将使用苹果股票数据。以下是样本数据

Date Open High Low Close Volume AdjClose
2013-12-31 00:00:00 554.170013 561.279976 554.000023 561.019997 55771100 76.297771
2013-12-30 00:00:00 557.460022 560.089989 552.319984 554.519981 63407400 75.41378

加载到Spark Dataframe

下面的代码将数据加载到spark dataframe中。

val appleStockDf = sparkSession.read.format("csv").
      option("header","true")
      .option("inferSchema","true")
      .load("src/main/resources/applestock_2013.csv")

分析日期

在本节中,让我们看看如何回答与日期相关的问题。

  • 有属于周末的记录吗?

这种分析通常是为了确保数据的质量。周末应该不会有任何数据,因为周末不会有交易。

assert(sparkSession.sql
       ("select * from stocks where dayofweek(Date)==1 or 
       dayofweek(Date)==7").count() == 0)

在上述代码中,1表示星期天,7表示星期六。我们可以看到,代码是不可读的,除非我们知道如何解码这些神奇的数字。

  • 显示季度最高价格

这个分析找到了给定季度的最大值。

appleStockDf.groupBy(year($"Date"),quarter($"Date")).
      avg("Close").
      sort("year(Date)","quarter(Date)")
      .show()

使用Spark日期函数进行数据分析的挑战

尽管我们可以使用spark builtin数据函数来完成上面的分析,但是编写它们是很困难的。此外,从外部BI解决方案很难表达这些需求,通常业务分析师用户是最终用户。因此,我们需要一种更简单、更好的方法来实现上述目标。

日期维度

日期维是一个静态数据集,它在列中列出给定日期的所有不同属性。这个示例数据集模式如下所示

t
 |-- date_key: integer (nullable = true)
 |-- full_date: string (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- day_num_in_month: integer (nullable = true)
 |-- day_num_overall: integer (nullable = true)
 |-- day_name: string (nullable = true)
 |-- day_abbrev: string (nullable = true)
 |-- weekday_flag: string (nullable = true)
 |-- week_num_in_year: integer (nullable = true)
 |-- week_num_overall: integer (nullable = true)
 |-- week_begin_date: string (nullable = true)
 |-- week_begin_date_key: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- month_num_overall: integer (nullable = true)
 |-- month_name: string (nullable = true)
 |-- month_abbrev: string (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- yearmo: integer (nullable = true)
 |-- fiscal_month: integer (nullable = true)
 |-- fiscal_quarter: integer (nullable = true)
 |-- fiscal_year: integer (nullable = true)
 |-- last_day_in_month_flag: string (nullable = true)
 |-- same_day_year_ago: string (nullable = true)

在上面的格式中,一些重要的列是

* full_date - Timestamp for given day
* year - year in the date
* quarter - quarter the given date belongs
etc.

这个静态数据集可以生成多年并保持可用。我们在示例中使用的示例可以从下面的链接下载。

https://www.kimballgroup.com/data-warehouse-business-intelligence-resources/books/microsoft-data-warehouse-dw-toolkit/.

使用日期维度进行数据分析

本节讨论如何使用日期维度进行上述分析。

加载数据以触发Dataframe

我们可以为我们的数据集创建一个dataframe,如下所示

val originalDf = sparkSession.read.format("csv").
      option("header","true")
      .option("inferSchema","true")
      .load(dataPath)

    //replace space in the column names
    val new_columns = originalDf.schema.fields
      .map(value => value.copy(name = value.name.replaceAll("\\s+","_")))

    val newSchema = StructType(new_columns)
    val newNameDf = sparkSession.createDataFrame(originalDf.rdd, newSchema)

    import org.apache.spark.sql.functions._
    val dateDf = newNameDf.withColumn("full_date_formatted",
      to_date(newNameDf.col("full_date"),"dd/MM/yy"))

在上面的代码中,进行了预处理,将字符串转换为spark date数据类型。

与股票数据连接

我们可以使用spark连接将股票数据与日期结合起来

val joinedDF = appleStockDf.join(dateDf, appleStockDf.col("Date") ===
      dateDf.col("full_date_formatted"))
This join doesn’t increase size of the data as it’s an inner join.

这个连接并不会增加数据的大小,因为它是一个内部连接。

分析

本节介绍如何在不使用复杂的spark函数的情况下进行分析

  • 有属于周末的记录吗?
assert(joinedDF.filter("weekday_flag != 'y'").count()==0)
  • 显示季度最高价格
joinedDF.groupBy("year","quarter").
      avg("Close").
      sort("year","quarter")
      .show()

日期维度的优点

本节讨论日期维度的优点。

跨不同分析的重用

相同的数据集可以用于不同的数据分析。与在查询中编写特殊函数或在数据集本身上添加这些列不同,拥有标准的日期维度有助于使所有的数据分析标准化。

Scalable

用户可以在日期维度上添加更多的属性,如区域假日等。这将丰富每个人的分析。这里不需要额外的查询。

用户友好

使用日期维度生成的查询更容易理解。

引用

https://www.kimballgroup.com/data-warehouse-business-intelligence-resources/kimball-techniques/dimensional-modeling-techniques/calendar-date-dimension/.

代码

文本示例代码可以在 github 上查看
https://github.com/phatak-dev/spark2.0-examples/blob/2.4/src/main/scala/com/madhukaraphatak/examples/sparktwo/datamodeling/DateHandlingExample.scala


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!

【译】Apache Spark 数据建模之时间维度(一)

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

【译】Apache Spark 数据建模之时间维度(一)

上一篇:[AR]Vumark(下一代条形码)


下一篇:C++ STL之vector用法总结