package test import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession} import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} import java.sql.{Connection, DriverManager} /** * * * @autor gaowei * @Date 2020-04-13 17:59 */ object kafkaToMysqlTest { class MysqlSink(url: String, user: String, pwd: String) extends ForeachWriter[Row] { var conn: Connection = _ override def open(partitionId: Long, epochId: Long): Boolean = { Class.forName("com.mysql.jdbc.Driver") conn = DriverManager.getConnection(url, user, pwd) true } override def process(value: Row): Unit = { val p = conn.prepareStatement("replace into test(pid,pv) values(?,?)") p.setString(1, value(0).toString) p.setLong(2, value(1).toString.toLong) p.execute() } override def close(errorOrNull: Throwable): Unit = { conn.close() } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("kafkaToMysqlTest").getOrCreate() val brokers = "localhost:9092" val topics = "test1" val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topics).load() import spark.implicits._ val kafkaDf = df.selectExpr("CAST(value AS STRING)").as[String] val dataFrame = kafkaDf.groupBy("value").count(). toDF("pid","pv") //todo 将数据写到MYSQL val mysqlSink = new MysqlSink("jdbc:mysql://localhost:3306/warehouse", "root", "410410410") val query = dataFrame.writeStream.outputMode("complete").foreach(mysqlSink).start() query.awaitTermination() } }