SparkSQL DataFrame Select时传入可变参数

最近有个新需求用到spark动态读取数据源,因数据源列名通过json传入,而且业务场景是只用DataFrame做不好用SQL处理。

一开始错误代码示例:

   val ds: Dataset[String] = sparkSession.read.textFile("input/user.txt","input/traffics.txt")
    ds.show(20)
    val result = ds.map(x=>{
      val str = x.split(" ")

     (str(0),str(1))
    }).toDF("id","month")

    result.select("id","month").show()//能正常执行
	//模拟传入的列名
    val list = List("id","month") 
    result.select(list.mkString(","))//出错

报错如下:

Exception in thread "main" java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:64)
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`id,month`' given input columns: [id, month];;
'Project ['id,month]
+- Project [_1#16 AS id#19, _2#17 AS month#20]
......

大概意思就是把 id,month 作为了一个字符串了,而DF中没有这个列名,故出错,
查看select api 可知需要的是一个可变参数列表

/**
   * Selects a set of columns. This is a variant of `select` that can only select
   * existing columns using column names (i.e. cannot construct expressions).
   *
   * {{{
   *   // The following two are equivalent:
   *   ds.select("colA", "colB")
   *   ds.select($"colA", $"colB")
   * }}}
   *
   * @group untypedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

所以解决如下:

// List[String] 转化为 List[Column] 类型即可
 val list = List("id","month").map(col(_)) 
 // list[Column]作为可变参传入即可
 result.select(list: _*).show()

仅作为个人学习记录,其实开发中绕过这个问题拼接SQL也是同样能处理的。

上一篇:SparkSQL和IDEA整合Hive详解


下一篇:SparkSQL统一数据的加载与落地