Kafka-consumer 不要忘了close
package com.ws.kafka
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
object KProducer {
def main(args: Array[String]): Unit = {
val prop = new Properties()
// 指定kafka的server地址
prop.setProperty("bootstrap.servers","dream1:9092,dream2:9092,dream3:9092")
// 设置反序列化组件 一下两种方式都可以
prop.setProperty("key.serializer",classOf[StringSerializer].getName)
prop.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
// 主题名称
val topic = "wordcount"
// 创建生产者
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](prop)
for (i <- 1 to 10){
// 指定编号的数据
val record1 = new ProducerRecord[String,String](topic,1,System.currentTimeMillis(),"key"+i,"value"+i)
// 轮询分区,根据分区编号
val par = i % 3
val record2 = new ProducerRecord[String,String](topic,par,System.currentTimeMillis(),"key"+i,"value"+i)
// 根据key进行分区 key.hashcode % partationnum
val record3 = new ProducerRecord[String,String](topic,"key","value"+i)
// 什么都不指定,默认策略(沦陷)
val record4 = new ProducerRecord[String,String](topic,"value"+i)
// 发送消息
producer.send(record4)
}
producer.close()
}
}