Kafka-consumer

 Kafka-consumer 不要忘了close

package com.ws.kafka

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

object KProducer {
  def main(args: Array[String]): Unit = {
    val prop = new Properties()
    // 指定kafka的server地址
    prop.setProperty("bootstrap.servers","dream1:9092,dream2:9092,dream3:9092")
    // 设置反序列化组件 一下两种方式都可以
    prop.setProperty("key.serializer",classOf[StringSerializer].getName)
    prop.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
    // 主题名称
    val topic = "wordcount"
    // 创建生产者
    val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](prop)
    for (i <- 1 to 10){
      // 指定编号的数据
      val record1 = new ProducerRecord[String,String](topic,1,System.currentTimeMillis(),"key"+i,"value"+i)
      // 轮询分区,根据分区编号
      val par = i % 3
      val record2 = new ProducerRecord[String,String](topic,par,System.currentTimeMillis(),"key"+i,"value"+i)
      // 根据key进行分区 key.hashcode % partationnum
      val record3 = new ProducerRecord[String,String](topic,"key","value"+i)
      // 什么都不指定,默认策略(沦陷)
      val record4 = new ProducerRecord[String,String](topic,"value"+i)
      // 发送消息
      producer.send(record4)
    }
    producer.close()
  }
}

 

上一篇:3.__setProperty设置全局属性


下一篇:Properties