package TEST
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util
import java.util.Properties
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* description: spark 读、写、删除 postgressql数据库 代码测试
* author: 徐国胜
* since: 2021/11/26 15:37
* version: 1.0
*/
object PostgresqlTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("PostgresqlTest")
.config("spark.ui.showConsoleProgress", "true")
.config("spark.shuffle.reduceLocality.enabled", "false")
.enableHiveSupport()
.getOrCreate()
//造数据
val dataList = new util.ArrayList[Row]()
dataList.add(Row("1", "徐国胜1", "22"))
dataList.add(Row("2", "邱良东2", "22"))
val schema = StructType(List(StructField("id", StringType, true), StructField("name", StringType, true), StructField("age", StringType, true)))
val DF: DataFrame = spark.createDataFrame(dataList, schema)
//DataFrame写入PG库
WriteDfToPG(DF, "testdb", "bigdata.student")
/**
* 查询PG库中某个表的数据
* 如果需要查询某个的字段,直接在后面接select就行
*/
val dataDF: DataFrame = QueryPG(spark, "testdb", "bigdata.student")
//按条件筛选,删除数据库中的数据;如果需要删除全表,可以把删选条件写成 1=1
DeleteFromPG("testdb", "bigdata.student", "name='邱良东'")
}
def QueryPG(spark: SparkSession, databaseName: String, tableName: String): DataFrame = {
val url: String = s"jdbc:postgresql://172.16.221.208:15432/$databaseName"
val prop: Properties = new Properties()
prop.put("user", "postgres")
prop.put("password", "bm@123")
prop.put("driver", "org.postgresql.Driver")
spark.read.jdbc(url, tableName, prop)
}
def WriteDfToPG(df: DataFrame, databaseName: String, tableName: String): Unit = {
val url: String = s"jdbc:postgresql://172.16.221.208:15432/$databaseName"
val prop: Properties = new Properties()
prop.put("user", "postgres")
prop.put("password", "bm@123")
prop.put("driver", "org.postgresql.Driver")
df.write.mode("Append").jdbc(url, tableName, prop)
}
def DeleteFromPG(databaseName: String, tableName: String, condition: String): Unit = {
Class.forName("org.postgresql.Driver")
val url: String = s"jdbc:postgresql://172.16.221.208:15432/$databaseName"
val prop: Properties = new Properties()
prop.put("user", "postgres")
prop.put("password", "bm@123")
prop.put("driver", "org.postgresql.Driver")
val connection: Connection = DriverManager.getConnection(url, prop)
val delSql = s"delete from $tableName where $condition"
val delPS: PreparedStatement = connection.prepareStatement(delSql)
delPS.execute()
delPS.close()
println("delete query :" + delSql)
}
}