使用spark从kafka读取数据写入ES

0.前情提要

简略学习elasticsearch - 简书
并增加mapping字段timestamp

PUT /danmaku/_mapping
{
  "properties": {
    "timestamp": {
      "type": "date",
      "index": false
    }
  }
}

1.添加maven依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>7.15.1</version>
</dependency>

2.获取kafka的数据

object KafkaExtractor {
    def extract(ssc :StreamingContext,topic:String, groupId: String = "spark"): InputDStream[ConsumerRecord[String, String]] = {
        val myConf = new MyConf()
        val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> myConf.getAppConf("bootstrap.servers"),
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> groupId,
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> (true: java.lang.Boolean)
        )
        val topics = Array(topic)
        val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
            Subscribe[String, String](topics, kafkaParams))
        return stream
    }
}

3.往ES中写入数据

object KafkaToESLoader {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)
        val scf = new SparkConf().setMaster("local[3]").setAppName(f"Kafka${"danmaku"}ToES${"danmaku"}")
        scf.set("es.nodes", "hdp21,hdp22,hdp23").set("es.port", "9200")
        val sc = new SparkContext(scf)
        val ssc = new StreamingContext(sc, Seconds(1))
        val stream: InputDStream[ConsumerRecord[String, String]] = KafkaExtractor.extract(ssc, "danmaku_test", "printTest")
        stream.map[ESObject](cr => {
            val eso = BiliESObjectFactory.getESObjectFromKafka("danmaku", cr)
            eso
        }).saveToEs("/danmaku", Map("es.mapping.id" -> "id"))
        ssc.start();
        ssc.awaitTermination();
    }
}

4.反序列化kafka的数据

object BiliESObjectFactory {
    def getESObjectFromKafka(name: String,cr: ConsumerRecord[String, String]): ESObject ={
        name match {
            case "danmaku"=>  ESDanmaku(cr)
            case _ => throw new KeyException(f"${name} not in BiliESObjectFactory")
        }
    }
}
class ESObject
case class ESDanmaku(id: String, timestamp: Long,
                     room_id: String,
                     uname: String, uid: String,
                     msg: String, msg_type: String
                    ) extends ESObject {

}
object ESDanmaku{
    def apply(cr: ConsumerRecord[String, String]): ESDanmaku = {
        val timestamp = cr.key().toLong
        val json = JSON.parseObject(cr.value())
        val danmakuES = new ESDanmaku(
            id = f"${json.getString("room_id")}_${json.getString("uid")}_${cr.key()}",
            timestamp = timestamp,
            room_id = json.getString("room_id"),
            uid = json.getString("uid"),
            uname = json.getString("uname"),
            msg = json.getString("msg"),
            msg_type = json.getString("msg_type"))
        danmakuES
    }
}

总结

可以使用Case class 做DBO
ES指定id使用saveToEs("/danmaku", Map("es.mapping.id" -> "id")) 的第二个参数,如果不指定则不需要

参考资料

https://www.elastic.co/guide/en/elasticsearch/hadoop/7.15/spark.html



作者:RAmenL
链接:https://www.jianshu.com/p/ee94064104d5
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

上一篇:SharePoint Framework 在web部件中使用已存在的JavaScript库 - 捆绑打包和外部引用


下一篇:Embeding Video Players in web pages Summary