import java.util import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} object KafkaDricteRedis {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("redis").setMaster("local[*]")
val ssc = new StreamingContext(conf,new Duration(5000)) val groupid = "GB01" //组名
val topic = "topic_bc"//topic 名
//在redis中以 groupid/topic作为唯一标识 ,存储分区偏移量
//在Reids 使用的时hash类型来存储
val gtKey = groupid+"/"+topic
//topic
val topics = Set(topic)
//zk地址
val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
//brokerList
val brokerList = "hadoop04:9092,hadoop05:9092,hadoop06:9092" val kafkaParams = Map(
// metadata.broker.list
"metadata.broker.list"->brokerList,
"group.id"->groupid,
"auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
//从头开始消费
)
//记录topic 、分区对应的偏移量偏移量,在创建InputDStream时作为参数传如
//从这个偏移量开始读取
var fromOffset = Map[TopicAndPartition,Long]()
var kafkaDStream :InputDStream[(String,String)] = null
// 获取一个jedis连接
val conn = getConnection()
// conn.flushDB()
//jd.hget(groupid+topic,"")
//获取全部的keys
val values: util.Set[String] = conn.keys("*")
//println(values)
// [GB01/wordcount3] 分区数 偏移量
//如果keys中包含 GB01/wordcount3这样的key,则表示以前读取过
if(values.contains(gtKey)){
//获取key 为GB01/wordcount3 下面所对应的(k,v) /** conn.hgetAll(gtKey) GB01/wordcount3:
* 1 888
* 2 888
* 3 888
* 4 888
*/
var allKey: util.Map[String, String] = conn.hgetAll(gtKey)
//导入后,可以把Java中的集合转换为Scala中的集合
import scala.collection.JavaConversions._
var list: List[(String, String)] = allKey.toList
//循环得到的(k,v)
//这里面的 k 对应的是分区, v对应的是偏移量
for (key <- list){ //这里的key是一个tuple类型
//new一个TopicAndPartition 把 topic 和分区数传入
val tp = new TopicAndPartition(topic,key._1.toInt)
//把每个topic 分区 对应的偏移量传入
fromOffset += tp -> key._2.toLong
println("分区"+key._1+"偏移量为"+key._2)
}
//这里的是把数据(key ,value)是kafka 的key默认是null,
//value 是kafka中的value
val messageHandler =(mmd:MessageAndMetadata[String,String])=>{
( mmd.key(),mmd.message())
}
//创建一个InputDStream
kafkaDStream= KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
kafkaParams,fromOffset,messageHandler)
}else{
//如果以前没有读取过,创建一个新的InputDStream
kafkaDStream= KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc,kafkaParams,topics
) }
//用来更新偏移量,OffsetRange中可以获取分区及偏移量
var OffsetRangs = Array[OffsetRange]()
//
kafkaDStream.foreachRDD(kafkaRDD=> {
//这里面的RDD是kafkaRDD ,可以转换为HasOffsetRange
val ranges = kafkaRDD.asInstanceOf[HasOffsetRanges]
// 获取分区信息的集合
OffsetRangs = ranges.offsetRanges
//获取value,(key 默认是null,没有用)
val map: RDD[String] = kafkaRDD.map(_._2)
map.foreach(x=>print("")) //更新偏移量
for (o <- OffsetRangs){
//取出偏移量
val offset = o.untilOffset
//取出分区
val partition = o.partition
println("partition: "+partition)
println("offset: "+offset)
//把通过hset,把对应的partition和offset写入到redis中
conn.hset(gtKey,partition.toString,offset.toString)
}
}) ssc.start()
ssc.awaitTermination() }
//Jedis连接池
def getConnection(): Jedis ={
//new 一个JedisPoolConfig,用来设定参数
val conf = new JedisPoolConfig()
val pool = new JedisPool(conf,"192.168.121.12",6379)
//最大连接数
conf.setMaxTotal(20)
//最大空闲数
conf.setMaxIdle(20) val jedis = pool.getResource()
//密码
jedis.auth("test123")
jedis }