最近有个新需求用到spark动态读取数据源,因数据源列名通过json传入,而且业务场景是只用DataFrame做不好用SQL处理。
一开始错误代码示例:
val ds: Dataset[String] = sparkSession.read.textFile("input/user.txt","input/traffics.txt")
ds.show(20)
val result = ds.map(x=>{
val str = x.split(" ")
(str(0),str(1))
}).toDF("id","month")
result.select("id","month").show()//能正常执行
//模拟传入的列名
val list = List("id","month")
result.select(list.mkString(","))//出错
报错如下:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:64)
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`id,month`' given input columns: [id, month];;
'Project ['id,month]
+- Project [_1#16 AS id#19, _2#17 AS month#20]
......
大概意思就是把 id,month 作为了一个字符串了,而DF中没有这个列名,故出错,
查看select api 可知需要的是一个可变参数列表
/**
* Selects a set of columns. This is a variant of `select` that can only select
* existing columns using column names (i.e. cannot construct expressions).
*
* {{{
* // The following two are equivalent:
* ds.select("colA", "colB")
* ds.select($"colA", $"colB")
* }}}
*
* @group untypedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
所以解决如下:
// List[String] 转化为 List[Column] 类型即可
val list = List("id","month").map(col(_))
// list[Column]作为可变参传入即可
result.select(list: _*).show()
仅作为个人学习记录,其实开发中绕过这个问题拼接SQL也是同样能处理的。