(3)创建保存品牌聚合结果的表
2.3.3 创建相关工具类 (1)创建查询 MySQL 数据库的工具类 MySQLUtil
package com.atguigu.gmall.realtime.utils
import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData,
Statement}
import com.alibaba.fastjson.JSONObject
import scala.collection.mutable.ListBuffer
/**
* Author: Felix
* Desc: 查询 MySQL 工具类
*/
object MySQLUtil {
def main(args: Array[String]): Unit = {
val list: List[ JSONObject] = queryList("select * from offset_2020")
println(list)
}
def queryList(sql:String):List[JSONObject]={
Class.forName("com.mysql.jdbc.Driver")
val resultList: ListBuffer[JSONObject] = new ListBuffer[ JSONObject]()
val conn: Connection = DriverManager.getConnection(
"jdbc:mysql://hadoop202:3306/gmall2020_rs?characterEncoding=utf-8&useSSL=fal
se",
"root",
"123456")
val stat: Statement = conn.createStatement
println(sql)
val rs: ResultSet = stat.executeQuery(sql)
val md: ResultSetMetaData = rs.getMetaData
while ( rs.next ) {
val rowData = new JSONObject();
for (i <-1 to md.getColumnCount ) {
rowData.put(md.getColumnName(i), rs.getObject(i))
}
resultList+=rowData
}
stat.close()
conn.close()
resultList.toList
} }
(2)读取 MySQL 中偏移量的工具类 OffsetManagerM
package com.atguigu.gmall.realtime.utils
import com.alibaba.fastjson.JSONObject
import org.apache.kafka.common.TopicPartition
/**
* Author: Felix
* Desc: 读取 MySQL 中偏移量的工具类
*/
object OffsetManagerM {
/**
* 从 Mysql 中读取偏移量
* @param consumerGroupId
* @param topic
* @return
*/
def getOffset(topic: String, consumerGroupId: String): Map[TopicPartition,
Long] = {
val sql=" select group_id,topic,topic_offset,partition_id from offset_2020
" +
" where topic='"+topic+"' and group_id='"+consumerGroupId+"'"
val jsonObjList: List[JSONObject] = MySQLUtil.queryList(sql)
val topicPartitionList: List[(TopicPartition, Long)] = jsonObjList.map {
jsonObj =>{
val topicPartition: TopicPartition = new TopicPartition(topic,
jsonObj.getIntValue("partition_id"))
val offset: Long = jsonObj.getLongValue("topic_offset")
(topicPartition, offset)
}
}
val topicPartitionMap: Map[TopicPartition, Long] = topicPartitionList.toMap
topicPartitionMap
} }
2.3.4 在 OrderWideApp 中将数据写回 Kafka
import sparkSession.implicits._
orderWideWithSplitDstream.foreachRDD{
rdd=>{
rdd.cache()
//将数据保存到 ClickHouse
val df: DataFrame = rdd.toDF()
df.write.mode(SaveMode.Append)
.option("batchsize", "100")
.option("isolationLevel", "NONE") // 设置事务
.option("numPartitions", "4") // 设置并发
.option("driver","ru.yandex.clickhouse.ClickHouseDriver")
.jdbc("jdbc:clickhouse://hadoop202:8123/default","t_order_wide_2020",ne
w Properties())
//将数据写回到 Kafka
rdd.foreach{orderWide=>
MyKafkaSink.send("dws_order_wide", JSON.toJSONString(orderWide,new
SerializeConfig(true)))
}
//提交偏移量
OffsetManagerUtil.saveOffset(orderInfoTopic,orderInfoGroupId,orderInfoOffset
Ranges)
OffsetManagerUtil.saveOffset(orderDetailTopic,orderDetailGroupId,orderDetail
OffsetRanges)
} }
ssc.awaitTermination()
2.3.5 关于本地事务保存 MySql
我们在处理事务的时候引用了一个 scala 的 MySQL 工具:scalikeJdbc
(1)读取配置文件
默认从 classpath 下读取 application.conf,获取数据库连接信息,所以我们在
resources 下创建 application.conf 文件
(2)程序中加载配置 DBs.setup() (3)本地事务提交数据 凡是在 DB. localTx ( implicit session => { } )中的 SQL 全部被本地事务进行关联,一 条失败全部回滚 DB.localTx( implicit session => { SQL 1 SQL 2 } ) 2.3.6 创建 TrademarkStatApp(ads)
2.3.7 测试 ➢ 确保相关服务启动 Hdfs、ZK、Kafka、Redis、Hbase、Maxwell ➢ 运行 BaseDBMaxwellApp、OrderInfoApp、OrderDetailApp、OrderWideApp、 TrademarkStatApp,运行模拟生成业务数据 jar 包 ➢ 抛出异常查看数据结果,注释掉异常查看数据库表结果