kafka生产者和消费者

在使用kafka时,有时候为验证应用程序,需要手动读取消息或者手动生成消息。这个时候可以借助kafka-console-consumer.sh和kafka-console-producer.sh 这两个工具,它们包装了java客户端,让用户不需要编写整个应用程序就可以与kafka主题发生交互。

生产者

kafka-console-consumer.sh工具可以用于向kafka主题写入消息。默认情况下,该工具将命令行的每一行视为一个消息,消息的键和值以tab字符分隔,如果没有出现table字符,那么键就是null。

控制台生产者有两个参数是必须指定的: --broker-list参数指定了一个或多个broker,它们以逗号峰,格式为hostname:port;另一个参数是--topic指定了生成消息的目标主题。在生成完消息之后,需要发送一个EOF字符来关闭客户端。

[root@test3 bin]# ./kafka-console-producer.sh --broker-list 10.0.102.204:9092 --topic science
sample messsage 1

在控制台使用生产者也是可以指定配置文件的,有两种形式!

  • 通过--producer.config  configfile指定消费者的配置文件,其中configfile是配置文件的全路径。
  • 另一种方式是直接在命令行以--producer-property  key=value的格式传递一个或多个参数,key是参数名字,value是参数的值。

生产者有都许多命令行参数,可以调整其行为:

--key-serializer  classname:指定消息键的编码器的类名,默认是Kafka.serializer.DefaultEncoder.
--value-serializer  classname: 指定消息值的编码器的类名,默认是Kafka.serializer.DefaultEncoder.
--compress-codec  string:指定生成消息所使用的压缩类型,可以是none, gzip, snappy或lz4,默认是gzip。
--sync       :指定以同步的方式生成消息,也就是说,在发送下一个消息之前会等待当前消息得到确认。

消费者控制台

kafka-console-consumer.sh工具提供了一种从一个或多个主题以上读取消息的方式。消息被打印在标准输出上,消息之间以空行分割。默认情况下,它会打印没有经过格式化的原始消息字节。它有很多可选参数,其中有一些参数是必选的。

第一:要指定是否使用新版本的消费者,并指定kafka集群的地址。如果使用的是旧版本的消费者,只需要提供--zookeeper参数。如果使用了新版本的消费者,必须使用--new-consumer和--borker-list, --borker-list后面需要跟上以逗号分割的broker地址列表。

第二:指定要读取的主题。有3个参数可以选择,分别是--topic, --whitelist和 --blacklist。此处运行只指定一个参数。--topic用于指定单个待读取的主题,--whitelist和 --blacklist后面跟着一个正则表达式(在命令行可能需要转义)。与白名单匹配的主题将会被读取,与黑名单匹配的主题不会被读取。

[root@test3 bin]# ./kafka-console-consumer.sh --zookeeper 10.0.102.204:2181  --topic science
test 1

除了基本的命令行参数外,也可以把消费者的其他的配置参数传递给控制台消费者。可以通过两种方式来达到这个目的。第一种方式将配置参数写在一个文件里,然后通过--consumer.config 指定配置文件。另一种是在命令行以--consumer-property  key=value的格式传递一个或多个参数,其中key指定参数的名字,value指定参数的值。这种方式在设置消费者属性时会很有用,比如设置群组的ID。

控制台消费者和控制台生产者有一个共同的参数--property,这个参数用于向消息格式化器传递配置信息,而不是给客户端本身传递配置信息。

控制台消费者常用的配置如下:

--formatter classname :指定消息格式化器的类名,用于解码消息,它的默认值是kafka.tools.DefaultFormatter.
--from-beginning: 指定从最旧的偏移量开始读取数据,否则就从最新的偏移量开始读取。
--max-messages num: 指定在退出之前最多读取NUM个消息
--partition num: 指定只读ID为num的分区(需要新版本的消费者)


消息格式器选项:
除了默认的消息格式器之外,还有其他3中可用的格式化器。
kafka.tools.loggingMessageFormatter: 将消息输出到日志,而不是输出到标准的输出设备。日志级别为INFO,并且包含了时间戳,键和值。
kafka.tools.ChecksumMessageFormatter: 只打印消息的校验和
kafka.tools.NoOpMessageFormatter: 读取消息但不打印消息


kafka.tools.DefaultFormatter有一些非常有用的配置选项,这些选项可以通过--property命令行参数指定。
print.timestamp:  设置为true, 就会打印每个消息的时间戳。
print.key :设置为true,除了打印消息的值外,还会打印消息的键。
key.separator: 指定打印消息的键和消息的值所用的分割符。
line.separator: 指定消息之间的分割符。
key.deserializer: 指定打印消息的键所用的发序列化器的类名。
value.deserializer: 指定打印消息的值所用的发序列化器的类名。

读取偏移量主题

【有点问题】

有时候,我们需要知道提交的消费者群组的偏移量是多少,比如某个特定的群组是否在提交偏移量,或者偏移量提交的频度。这个可以通过让控制台消费者读取一个特殊的内部主题__consumer_offsets来实现。所有消费者的偏移量都以消息的形式写到这个主题上。为了解码这个主题的消息,需要使用Kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter这个格式化器。

[root@test3 bin]# ./kafka-console-consumer.sh --zookeeper 10.0.102.204:2181  --topic __consumer_offsets --formatter "Kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter" --max-messages 1

 

上一篇:kafka系列二:架构设计


下一篇:SQL语句大全