package com.jason.example import org.apache.spark.sql.functions.broadcast class DFTest extends SparkInstance { import spark.implicits._ val df = Seq( ("jason", 1, "理想",0), (null, 2, "理想",1), ("mac", 3, "理想",2), ("mac", 4, "理想",2) ).toDF("name", "depid", "company","groupid").repartition(3) val df3 = Seq( ("jason", 1, "理想",0), ("dong", 2, "理想",1), ("mac", 3, "理想",2) ).toDF("name", "depid", "company","groupid").repartition(3) val df2 = Seq( (3,"周浦",2), (4,"孙桥",0), (5,"金桥",1) ).toDF("depid","addr","gid").repartition(3) def ff(): Unit = { println(df.toString())//[name: string, depid: int ... 1 more field] println(df.schema) df.printSchema() df.explain(true)//Prints the plans (logical and physical) to the console for debugging purposes. println(df.dtypes.mkString(","))//(name,StringType),(depid,IntegerType),(company,StringType) println(df.columns.mkString(","))// //df.withWatermark() ??? df.show(30,false) df.na.drop("any"/*"all"*/).show(false) //删除df中包含null 或NaN 的记录,如果为any 则只要有有一列为 //null 或NaN 则删除整行,如果是all 则所有列是null ho NaN 时才删除整行 df.na.fill("xxx",Seq("name")).show()//缺失值填充,把null 或 NaN 替换为所需要的值 df.na.replace("name",Map("jason"->"abc","dong"->"def")).show()//将字段name 中 的值按照map 内容进行更改 //df.stat.xxx ??? df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"right").show() df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show() df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show() println("="*40) df.join(df2.hint("broadcast"),(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show() df.join(broadcast(df2),(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show()//spark 默认广播10MB的小表 //df2.hint("broadcast") 和 broadcast(df2) 是等同的 df.crossJoin(df2).show()//笛卡尔积 df.sort($"name".desc,$"depid".asc).show() df.select("name","depid").show() df.selectExpr("name as nm","depid as id").show() df.filter(s"""name='jason'""").show() df.where(s"""name='jason'""").select("name","depid").show df.rollup("name","depid").count().show() df.cube("name","depid").count().show() df.groupBy("name","depid").count().show() df.agg("name"->"max","depid"->"avg").show() df.groupBy("name","depid").agg("name"->"max","depid"->"avg").show() df.limit(2).show() df.union(df3).show() df.unionByName(df3).show() df.intersect(df3).show()//交集 df.except(df3).show() //差集 df.sample(0.5).show() df.randomSplit(Array(0.4,0.6)).apply(0).show() df.withColumn("depid",$"depid".<=(2)).show() // 该方法可以替换或增加一列到原df, 第二个参数中的col必须时df中的元素 df.withColumnRenamed("name","姓名").show() df.drop("name","depid")//舍弃某几列 df.distinct() df.dropDuplicates("name").show() //根据某几列去重,会保留最后一条数据 df.describe().show() //count,mean,min,max df.summary().show()//count,min,25%,50%,max df.head() //所有的数据会被collect到driver df.toLocalIterator() spark.stop() } } object DFTest { def main(args: Array[String]): Unit = { val dt = new DFTest dt.ff() } }