Spark Sql Join操作类型

Spark DataFrame支持所有基本SQL Join类型的操作,如INNER,RIGHT OUTER,LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN. Spark SQL Join操作是宽转换操作,结果数据会重组在网络中,因此当不仔细设计时,会有非常大的性能问题.

另外一方面,Spark SQL Join操作默认带更多优化(多亏DataFrame & DataSet), 虽然这样,当使用时需要考虑仍有一些性能问题.

在本文中,你会学到不同的Join语法并使用不同的Join 类型在DataFrame和DataSet上,例子用Scala.

SQL Join类型和语法

下边列出所有Spark SQL类型和语法

join(right: Dataset[_]): DataFrame
join(right: Dataset[_], usingColumn: String): DataFrame
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
join(right: Dataset[_], joinExprs: Column): DataFrame
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

剩下的教程解释6种语法 Join 类型,接收合适的DataFrame, Join表达式和Join类型用string.

对于语法4 & 5你可以使用"JoinType" 或者"Join 字符串"定义在上边的表用"JoinType"字符串参数.当你使用"JoinType",你应该import org.apache.spark.sql.catalyst.plans._ 作为定义JoinType对象.

JOINTYPE JOIN STRING EQUIVALENT SQL JOIN
Inner.sql inner INNER JOIN
FullOuter.sql outer, full, fullouter, full_outer FULL OUTER JOIN
LeftOuter.sql left, leftouter, left_outer LEFT JOIN
RightOuter.sql right, rightouter, right_outer RIGHT JOIN
Cross.sql cross
LeftAnti.sql anti, leftanti, left_anti
LeftSemi.sql semi, leftsemi, left_semi

所有Join对象定义joinType对象为了使用你需要导入org.apache.spark.sql.catalyst.plans.{LeftOuter,Inner,....}.

在你使用Spark SQL join例子之前,首先,先创建empdept DataFrame. 在这里emp_id 列在emp是唯一的, dept_id 在 dept数据集唯一, 并且在empemp_dept_id 指向dept数据集中dept_id.

val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
    (3,"Williams",1,"2010","10","M",1000),
    (4,"Jones",2,"2005","10","F",2000),
    (5,"Brown",2,"2010","40","",-1),
      (6,"Brown",2,"2010","50","",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","year_joined",
       "emp_dept_id","gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )

  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

Inner Join

inner join为默认join,并且也是最常用的.被用来join两个DataFrame/Dataset 在指定列上,在两个数据集上没有匹配列上的数据会被丢弃.

empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"inner")
    .show(false)

当应用inner join在数据集上时,会把emp 中emp_dept_id=50和 dept中dept_id=30的数据丢弃.下边是上边的输出结果

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

full outer join

outer也叫full, fullouter join返回Spark DataFrame/Dataset中所有的行, join表达式没有匹配上的用null来表示对象列.

empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"outer").show(false)
empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"full").show(false)
empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"fullouter").show(false)

Left Outer Join

Spark leftleft outer join返回所有在左边DataFrame/Dataset的所有列,忽略右边数据没有匹配上的数据,它被分配null.

empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"left")
  .show(false)
empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftouter")
  .show(false)

在empDF数据集emp_dept_id=50没有记录在dept中,因此数据集在dept列(dept_name & dept_id)为null,并且dept_id=30在dept中会被丢掉.下边是上边Join表达式的结果.

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
Scala

Right Outer Join

Spark Right 同 Right Outer join 相对于left join是另外一方向,会返回所有右侧DataFrame/Dataset的行,并且忽略匹配在左侧数据集当没有匹配上时,会分区null为那些数据,并且丢掉左侧没有匹配上的数据.

empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"right")
   .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"rightouter")
   .show(false)

例子中,右侧数据集dep_id=30没有在左侧数据集emp中,因此这条记录包含null在emp列中.并且,emp_dept_id=50的没有匹配上,会被丢掉.下边是输出

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

Left Semi Join

Spark left Semi 和inner join相似,不同点leftsemi join返回在左数据集中所有列,并且忽略右数据集中所有列.换句话说,这个join返回的万仅仅是左数据匹配上右数据集的,没有匹配上的无论左右都被忽略.
相同的结果可以获得通过使用select在这个结果上,用inner join, 然而,使用这个join更高效.

empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"), "leftsemi")
    .show(false)

输出

leftsemi join
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

Left Anti Join

left anti join 做的和spark leftsemi刚好相反, leftanti join 返回仅仅是左侧列没有匹配上的.

 empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftanti")
    .show(false)

输出

+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+

Self Join

Spark join没有自联接join是不完整的.虽然没有自连接join类型可用,我们可以使用任何上边解释的join类型 join到DataFrame自身.下边为一个inner self join

empDF.as("emp1").join(empDF.as("emp2"),
    col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
    .select(col("emp1.emp_id"),col("emp1.name"),
      col("emp2.emp_id").as("superior_emp_id"),
      col("emp2.name").as("superior_emp_name"))
      .show(false)

我们可以为所有employees join emp数据集join自己来发现superior emp_id和名字

+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+

使用SQL表达式

因为Spark SQL支持SQL原生语法,我们可以写join操作在创建一个临时表之后,并使用spark.sql()

empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")
//SQL JOIN
  val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
  joinDF.show(false)

  val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
  joinDF2.show(false)

源代码


package com.sparkbyexamples.spark.dataframe.join

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object JoinExample extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
    (3,"Williams",1,"2010","10","M",1000),
    (4,"Jones",2,"2005","10","F",2000),
    (5,"Brown",2,"2010","40","",-1),
      (6,"Brown",2,"2010","50","",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )

  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)


  println("Inner join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"inner")
    .show(false)

  println("Outer join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"outer")
    .show(false)
  println("full join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"full")
    .show(false)
  println("fullouter join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"fullouter")
    .show(false)

  println("right join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"right")
    .show(false)
  println("rightouter join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"rightouter")
    .show(false)

  println("left join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"left")
    .show(false)
  println("leftouter join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftouter")
    .show(false)

  println("leftanti join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftanti")
    .show(false)

  println("leftsemi join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftsemi")
    .show(false)

  println("cross join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"cross")
    .show(false)

  println("Using crossJoin()")
  empDF.crossJoin(deptDF).show(false)

  println("self join")
  empDF.as("emp1").join(empDF.as("emp2"),
    col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
    .select(col("emp1.emp_id"),col("emp1.name"),
      col("emp2.emp_id").as("superior_emp_id"),
      col("emp2.name").as("superior_emp_name"))
      .show(false)

  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")

  //SQL JOIN
  val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
  joinDF.show(false)

  val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
  joinDF2.show(false)

}

结论

在本教程中,你已经学习了Spark SQL join 类型,INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF的使用,和对应SCALA代码.

上一篇:多线程012--线程的sleep、wait、join、yield如何使用


下一篇:对一元多项式的加减以及求导运算的实现