0.前情提要
简略学习elasticsearch - 简书
并增加mapping字段timestamp
PUT /danmaku/_mapping
{
"properties": {
"timestamp": {
"type": "date",
"index": false
}
}
}
1.添加maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.15.1</version>
</dependency>
2.获取kafka的数据
object KafkaExtractor {
def extract(ssc :StreamingContext,topic:String, groupId: String = "spark"): InputDStream[ConsumerRecord[String, String]] = {
val myConf = new MyConf()
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> myConf.getAppConf("bootstrap.servers"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val topics = Array(topic)
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
return stream
}
}
3.往ES中写入数据
object KafkaToESLoader {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val scf = new SparkConf().setMaster("local[3]").setAppName(f"Kafka${"danmaku"}ToES${"danmaku"}")
scf.set("es.nodes", "hdp21,hdp22,hdp23").set("es.port", "9200")
val sc = new SparkContext(scf)
val ssc = new StreamingContext(sc, Seconds(1))
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaExtractor.extract(ssc, "danmaku_test", "printTest")
stream.map[ESObject](cr => {
val eso = BiliESObjectFactory.getESObjectFromKafka("danmaku", cr)
eso
}).saveToEs("/danmaku", Map("es.mapping.id" -> "id"))
ssc.start();
ssc.awaitTermination();
}
}
4.反序列化kafka的数据
object BiliESObjectFactory {
def getESObjectFromKafka(name: String,cr: ConsumerRecord[String, String]): ESObject ={
name match {
case "danmaku"=> ESDanmaku(cr)
case _ => throw new KeyException(f"${name} not in BiliESObjectFactory")
}
}
}
class ESObject
case class ESDanmaku(id: String, timestamp: Long,
room_id: String,
uname: String, uid: String,
msg: String, msg_type: String
) extends ESObject {
}
object ESDanmaku{
def apply(cr: ConsumerRecord[String, String]): ESDanmaku = {
val timestamp = cr.key().toLong
val json = JSON.parseObject(cr.value())
val danmakuES = new ESDanmaku(
id = f"${json.getString("room_id")}_${json.getString("uid")}_${cr.key()}",
timestamp = timestamp,
room_id = json.getString("room_id"),
uid = json.getString("uid"),
uname = json.getString("uname"),
msg = json.getString("msg"),
msg_type = json.getString("msg_type"))
danmakuES
}
}
总结
可以使用Case class 做DBO
ES指定id使用saveToEs("/danmaku", Map("es.mapping.id" -> "id")) 的第二个参数,如果不指定则不需要
参考资料
https://www.elastic.co/guide/en/elasticsearch/hadoop/7.15/spark.html
作者:RAmenL
链接:https://www.jianshu.com/p/ee94064104d5
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。