kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql

package test
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
import java.sql.{Connection, DriverManager}
/** *
  *
  * @autor gaowei
  * @Date 2020-04-13 17:59 
  */
object kafkaToMysqlTest {
  class MysqlSink(url: String, user: String, pwd: String) extends ForeachWriter[Row] {
    var conn: Connection = _


    override def open(partitionId: Long, epochId: Long): Boolean = {
      Class.forName("com.mysql.jdbc.Driver")
      conn = DriverManager.getConnection(url, user, pwd)
      true
    }


    override def process(value: Row): Unit = {
      val p = conn.prepareStatement("replace into test(pid,pv) values(?,?)")
      p.setString(1, value(0).toString)
      p.setLong(2, value(1).toString.toLong)
      p.execute()
    }


    override def close(errorOrNull: Throwable): Unit = {
      conn.close()
    }
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("kafkaToMysqlTest").getOrCreate()
    val brokers = "localhost:9092"
    val topics = "test1"
    val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topics).load()
    import spark.implicits._
    val kafkaDf = df.selectExpr("CAST(value AS STRING)").as[String]
    val dataFrame = kafkaDf.groupBy("value").count().
       toDF("pid","pv")


    //todo 将数据写到MYSQL
    val mysqlSink = new MysqlSink("jdbc:mysql://localhost:3306/warehouse", "root", "410410410")

    val query = dataFrame.writeStream.outputMode("complete").foreach(mysqlSink).start()

    query.awaitTermination()
  }

}

 

kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql

上一篇:mysql 修改大表字段,报错ERROR 1878 (HY000): Temporary file write failure. 用pt-online-schema-change


下一篇:MySQL, SQLite 和 PostgreSQL 关于information_schema 的一些查询(表结构,表信息,库信息....)