spark 之 UDF的两种方式

详见:https://www.cnblogs.com/itboys/p/9347403.html

1)如果使用spark.sql("") 

=> 内部调用hive处理,只能使用spark.udf.register("",)

例如:

import org.apache.spark.sql.functions._
val maxandmin = udf{
  (cdata:Double,maxdata:Double,mindata:Double)=>{
    (cdata-mindata)/(maxdata-mindata)
  }
}
spark.udf.register("maxandmin",maxandmin)

def getUserbaseinfo(spark:SparkSession)={
  val sql = s"""select
              |userid,locale,gender,
              |location,
              |maxandmin(cage,max_age,min_age) age,
              |maxandmin(timezone,max_timezone,min_timezone) timezone,
              |maxandmin(members,max_members,min_members) members
              |from
              |(select  userid,
              |case when l.locale is null then 0 else l.localeid end locale,
              |gender,location,
              |calcage(birthyear) cage,min_age,max_age,
              |timezone,min_timezone,max_timezone,
              |members,min_members,max_members
              |from dwd_events.dwd_users u
              |left join dwd_events.dwd_locale l
              |on lower(u.locale)=lower(l.locale)
              |cross join (select min(calcage(birthyear)) min_age
              |,max(calcage(birthyear)) max_age,min(timezone) min_timezone,
              |max(timezone) max_timezone, min(members) min_members,max(members) max_members
              |from dwd_events.dwd_users) b ) c""".stripMargin
  spark.sql(sql)
}

2)如果使用DataFrame API

=> 仅使用udf()就行

 

上一篇:JSON简析


下一篇:mongodb副本集添加新的节点