kafka go producer 启动基本配置

  1.官网上下载kafka安装包:http://kafka.apache.org/downloads.html

  2.执行命令运行zookeeper 实例(单点):

   bin/zookeeper-server-start.sh config/zookeeper.properties
 3. 启动kafka broker 服务:
  
bin/kafka-server-start.sh config/server.properties
  其中的
server.properties 有些配置需要修改:
  listeners=PLAINTEXT://hostName:9092
  如果是远程producer,hostname设置为ip,这样远程机器无需设置host.
  log.dir 是broker的日志地址。
 4.在使用go的客户端 Shopify/sarama 包的操作过程:
   (1) go get "github.com/Shopify/sarama"
   (2) 修改config 中的配置:
      c.Version = V0_10_0_0 //使用的是kafka 0.10.0.0的版本
   (3) producer测试代码如下:
package main

import (
"github.com/Shopify/sarama"
"log"
"os"
"strings"
) var logger = log.New(os.Stderr, "[TEST]", log.LstdFlags) func main(){
sarama.Logger = logger config := sarama.NewConfig()
config.ClientID = "newsDataSource"
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner msg := &sarama.ProducerMessage{}
msg.Topic = "hello"
msg.Partition = int32(-1)
msg.Key = sarama.StringEncoder("key")
msg.Value = sarama.ByteEncoder("hello") producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
if err != nil {
logger.Printf("Failed to produce message :%s", err )
os.Exit(500)
} defer producer.Close() partition, offset, err := producer.SendMessage(msg)
if err != nil {
logger.Printf("Failed to produce message :%s", err )
}
logger.Printf("partition:%d, offset: %d\n", partition, offset )
}

  


  
 
 
上一篇:Java--FutureTask原理与使用(FutureTask可以被Thread执行,可以被线程池submit方法执行,并且可以监控线程与获取返回值)


下一篇:mysql数据库导出模型到powerdesigner,PDM图形窗口中显示数据列的中文注释