dataframe 基本操作

package com.jason.example

import org.apache.spark.sql.functions.broadcast

class DFTest extends SparkInstance {

  import spark.implicits._

  val df = Seq(
    ("jason", 1, "理想",0),
    (null, 2, "理想",1),
    ("mac", 3, "理想",2),
    ("mac", 4, "理想",2)
  ).toDF("name", "depid", "company","groupid").repartition(3)
  val df3 = Seq(
    ("jason", 1, "理想",0),
    ("dong", 2, "理想",1),
    ("mac", 3, "理想",2)
  ).toDF("name", "depid", "company","groupid").repartition(3)
  val df2 = Seq(
    (3,"周浦",2),
    (4,"孙桥",0),
    (5,"金桥",1)
  ).toDF("depid","addr","gid").repartition(3)
  def ff(): Unit = {
    println(df.toString())//[name: string, depid: int ... 1 more field]
    println(df.schema)
    df.printSchema()
    df.explain(true)//Prints the plans (logical and physical) to the console for debugging purposes.
    println(df.dtypes.mkString(","))//(name,StringType),(depid,IntegerType),(company,StringType)
    println(df.columns.mkString(","))//
    //df.withWatermark()  ???
    df.show(30,false)
    df.na.drop("any"/*"all"*/).show(false) //删除df中包含null 或NaN 的记录,如果为any 则只要有有一列为
    //null 或NaN 则删除整行,如果是all 则所有列是null ho NaN 时才删除整行
    df.na.fill("xxx",Seq("name")).show()//缺失值填充,把null 或 NaN 替换为所需要的值
    df.na.replace("name",Map("jason"->"abc","dong"->"def")).show()//将字段name 中 的值按照map 内容进行更改
    //df.stat.xxx  ???
    df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"right").show()
    df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show()

    df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show()
    println("="*40)
    df.join(df2.hint("broadcast"),(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show()
    df.join(broadcast(df2),(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show()//spark 默认广播10MB的小表
    //df2.hint("broadcast")  和 broadcast(df2) 是等同的
    df.crossJoin(df2).show()//笛卡尔积
    df.sort($"name".desc,$"depid".asc).show()
    df.select("name","depid").show()
    df.selectExpr("name as nm","depid as id").show()
    df.filter(s"""name='jason'""").show()
    df.where(s"""name='jason'""").select("name","depid").show
    df.rollup("name","depid").count().show()
    df.cube("name","depid").count().show()
    df.groupBy("name","depid").count().show()
    df.agg("name"->"max","depid"->"avg").show()
    df.groupBy("name","depid").agg("name"->"max","depid"->"avg").show()
    df.limit(2).show()
    df.union(df3).show()
    df.unionByName(df3).show()
    df.intersect(df3).show()//交集
    df.except(df3).show() //差集
    df.sample(0.5).show()
    df.randomSplit(Array(0.4,0.6)).apply(0).show()
    df.withColumn("depid",$"depid".<=(2)).show() // 该方法可以替换或增加一列到原df, 第二个参数中的col必须时df中的元素
    df.withColumnRenamed("name","姓名").show()
    df.drop("name","depid")//舍弃某几列
    df.distinct()
    df.dropDuplicates("name").show() //根据某几列去重,会保留最后一条数据
    df.describe().show() //count,mean,min,max
    df.summary().show()//count,min,25%,50%,max
    df.head() //所有的数据会被collect到driver
    df.toLocalIterator()

    spark.stop()
  }
}

object DFTest {
  def main(args: Array[String]): Unit = {
    val dt = new DFTest
    dt.ff()
  }
}

 

上一篇:thinkjs的select,find,getField


下一篇:laravel 注入那点事