Hadoop生态圈-Kafka常用命令总结

                Apache Kafka运维常用命令

                                        作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

一.管理Kafka服务的命令

1>.开启kafka服务

[root@node106.yinzhengjie.org.cn ~]# kafka-server-start.sh /home/softwares/kafka_2.-0.10.2.1/config/server.properties >> /dev/null  &          

2>.停止kafka服务(刷新日志到本地且迁移当前broker的所有leader的parition)

[root@node106.yinzhengjie.org.cn ~]# kafka-server-stop.sh         

温馨提示:
  需要注意的是,我们在停止kafka集群时,如果时partition的副本数为2的话,不推荐大家同时停掉2台broker,而且在生产环境中执行该脚本时可能需要等一段时间broker进程才会彻底停止,此期间最好不要使用kill命令去强行杀死进程。
  
优雅关闭:
  Kafka broker可能因为故障停机,但很多时候需要进行计划内的运维操作。这时候可使用更优雅的方式来关闭broker进程,而不是简单地kill掉。优雅关闭有两点好处:
    1>.会将所有logs刷写到磁盘,以便在重启时避免log恢复步骤,log恢复是比较耗时地,因此这回让计划内的重启加快。
    2>.会在关闭前,让该Broker是leader的所有partition把leader迁移到其他broker副本的服务器上去。这会使leader的转移加快,并使每个分区不可用的时间缩小到几毫秒。
  优雅关闭的脚本存放在kafka的安装目录下的bin目录下,脚本名称为"kafka-server-stop.sh",可以看到仍是使用kill来进行的。需要注意,上述好处第一条不需要额外配置,第二条需要设置controlled.shutdown.enable=true,而官方默认的配置就是true,因此我们不需要特意去修改该属性。

3>.查看kafka进程ID

[root@node106.yinzhengjie.org.cn ~]# jps | grep Kafka
Kafka
[root@node106.yinzhengjie.org.cn ~]#

二.kafka-topics.sh命令使用案例

1>.创建主题(kafka-topics.sh)

[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:,node108.yinzhengjie.org.cn: --create --partitions  --replication-factor  --topic yinzhengjie-kafka
Created topic "yinzhengjie-kafka".
[root@node106.yinzhengjie.org.cn ~]#

2>.查看主题列表

[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:,node108.yinzhengjie.org.cn: --list
yinzhengjie-kafka
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs'
node107.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint node106.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint
drwxr-xr-x root root Jul : yinzhengjie-kafka-
drwxr-xr-x root root Jul : yinzhengjie-kafka-
drwxr-xr-x root root Jul : yinzhengjie-kafka- node108.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint
drwxr-xr-x root root Jul : yinzhengjie-kafka-
drwxr-xr-x root root Jul : yinzhengjie-kafka- [root@node106.yinzhengjie.org.cn ~]#

[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs'        #我配置了3个存储目录,因此我们可以在这3个存储目录看到对应的数据目录哟~

[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs2'
node107.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint node108.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint
drwxr-xr-x root root Jul : yinzhengjie-kafka-
drwxr-xr-x root root Jul : yinzhengjie-kafka- node106.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint
drwxr-xr-x root root Jul : yinzhengjie-kafka-
drwxr-xr-x root root Jul : yinzhengjie-kafka- [root@node106.yinzhengjie.org.cn ~]#

[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs2'

[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs3'
node108.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint
drwxr-xr-x root root Jul : yinzhengjie-kafka-
drwxr-xr-x root root Jul : yinzhengjie-kafka- node107.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint node106.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint
drwxr-xr-x root root Jul : yinzhengjie-kafka-
drwxr-xr-x root root Jul : yinzhengjie-kafka- [root@node106.yinzhengjie.org.cn ~]#

[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs3'

3>.删除topic

[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:,node108.yinzhengjie.org.cn: --delete --topic yinzhengjie-kafka
Topic yinzhengjie-kafka is marked for deletion.                    #这里告诉我们说"yinzhengjie-kafka"这个topic已经标记为删除啦!
Note: This will have no impact if delete.topic.enable is not set to true.     #告诉我们如果想要真正删除的话,这里提示我们必须标记为True,但是真正的删除时间我们还得结合"log.retention.check.interval.ms"设置的时间,该参数用来检测触发删除策略的周期。为了实验方便,我推荐大家可以把这个值改为10000,即10秒,当然得重启集群方能生效。   
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:,node108.yinzhengjie.org.cn: --list
yinzhengjie-kafka - marked for deletion
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs'
node107.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint node108.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint node106.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint [root@node106.yinzhengjie.org.cn ~]#

[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs'          #由于我配置的kafka数据存储目录为3个,因此我们可以去验证对应partition的存储目录是否被删除。

[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs2'
node108.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint node107.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint node106.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint [root@node106.yinzhengjie.org.cn ~]#

[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs2'

[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs3'
node107.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint node108.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint node106.yinzhengjie.org.cn | SUCCESS | rc= >>
total
-rw-r--r-- root root Jul : cleaner-offset-checkpoint
-rw-r--r-- root root Jul : meta.properties
-rw-r--r-- root root Jul : recovery-point-offset-checkpoint
-rw-r--r-- root root Jul : replication-offset-checkpoint [root@node106.yinzhengjie.org.cn ~]#

[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs3'

4>.查看已经创建的topic详细信息

[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --create --zookeeper node106.yinzhengjie.org.cn:/kafka01 --partitions  --replication-factor  --topic yinzhengjie-kafka001
Created topic "yinzhengjie-kafka001".
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:/kafka01 --describe --topic yinzhengjie-kafka001
Topic:yinzhengjie-kafka001 PartitionCount: ReplicationFactor: Configs:
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
[root@node106.yinzhengjie.org.cn ~]#

5>.修改kafka的分区数并添加对应的配置信息

[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:/kafka01 --alter --topic yinzhengjie-kafka001 --partitions   --config max.message.bytes=
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "yinzhengjie-kafka001".                                           
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:/kafka01 --describe --topic yinzhengjie-kafka001
Topic:yinzhengjie-kafka001 PartitionCount: ReplicationFactor: Configs:max.message.bytes=10000000        #注意,这里的配置就是咱们刚刚使用--config参数自定义的
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,106              #很显然,分区数由之前的5个变为10个啦!
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]#

6>.为topic删除配置信息

[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:/kafka01 --describe --topic yinzhengjie-kafka001
Topic:yinzhengjie-kafka001 PartitionCount: ReplicationFactor: Configs:max.message.bytes=
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:/kafka01 --alter --topic yinzhengjie-kafka001 --delete-config max.message.bytes    #删除对应的配置信息
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "yinzhengjie-kafka001".
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:/kafka01 --describe --topic yinzhengjie-kafka001
Topic:yinzhengjie-kafka001 PartitionCount: ReplicationFactor: Configs:                  #我们发现这里的配置信息被删除啦!
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
Topic: yinzhengjie-kafka001 Partition: Leader: Replicas: , Isr: ,
[root@node106.yinzhengjie.org.cn ~]#

三.kafka功能测试

1>.启动生产者

[root@node106.yinzhengjie.org.cn ~]# kafka-console-producer.sh --broker-list node108.yinzhengjie.org.cn: --topic yinzhengjie-kafka2019
尹正杰到此一游!
https://www.cnblogs.com/yinzhengjie/
[root@node106.yinzhengjie.org.cn ~]# kafka-console-producer.sh
Read data from standard input and publish it to Kafka.
Option Description
------ -----------
--batch-size <Integer: size> Number of messages to send in a single
batch if they are not being sent
synchronously. (default: )
--broker-list <String: broker-list> REQUIRED: The broker list string in
the form HOST1:PORT1,HOST2:PORT2.
--compression-codec [String: The compression codec: either 'none',
compression-codec] 'gzip', 'snappy', or 'lz4'.If
specified without value, then it
defaults to 'gzip'
--key-serializer <String: The class name of the message encoder
encoder_class> implementation to use for
serializing keys. (default: kafka.
serializer.DefaultEncoder)
--line-reader <String: reader_class> The class name of the class to use for
reading lines from standard in. By
default each line is read as a
separate message. (default: kafka.
tools.
ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on The max time that the producer will
send> block for during a send request
(default: )
--max-memory-bytes <Long: total memory The total memory used by the producer
in bytes> to buffer records waiting to be sent
to the server. (default: )
--max-partition-memory-bytes <Long: The buffer size allocated for a
memory in bytes per partition> partition. When records are received
which are smaller than this size the
producer will attempt to
optimistically group them together
until this size is reached.
(default: )
--message-send-max-retries <Integer> Brokers can fail receiving the message
for multiple reasons, and being
unavailable transiently is just one
of them. This property specifies the
number of retires before the
producer give up and drop this
message. (default: )
--metadata-expiry-ms <Long: metadata The period of time in milliseconds
expiration interval> after which we force a refresh of
metadata even if we haven't seen any
leadership changes. (default: )
--old-producer Use the old producer implementation.
--producer-property <String: A mechanism to pass user-defined
producer_prop> properties in the form key=value to
the producer.
--producer.config <String: config file> Producer config properties file. Note
that [producer-property] takes
precedence over this config.
--property <String: prop> A mechanism to pass user-defined
properties in the form key=value to
the message reader. This allows
custom configuration for a user-
defined message reader.
--queue-enqueuetimeout-ms <Integer: Timeout for event enqueue (default:
queue enqueuetimeout ms> )
--queue-size <Integer: queue_size> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of messages will
queue awaiting sufficient batch
size. (default: )
--request-required-acks <String: The required acks of the producer
request required acks> requests (default: )
--request-timeout-ms <Integer: request The ack timeout of the producer
timeout ms> requests. Value must be non-negative
and non-zero (default: )
--retry-backoff-ms <Integer> Before each retry, the producer
refreshes the metadata of relevant
topics. Since leader election takes
a bit of time, this property
specifies the amount of time that
the producer waits before refreshing
the metadata. (default: )
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: )
--sync If set message send requests to the
brokers are synchronously, one at a
time as they arrive.
--timeout <Integer: timeout_ms> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of time a message
will queue awaiting sufficient batch
size. The value is given in ms.
(default: )
--topic <String: topic> REQUIRED: The topic id to produce
messages to.
--value-serializer <String: The class name of the message encoder
encoder_class> implementation to use for
serializing values. (default: kafka.
serializer.DefaultEncoder)
[root@node106.yinzhengjie.org.cn ~]#

[root@node106.yinzhengjie.org.cn ~]# kafka-console-producer.sh     #生产者脚本帮助信息

2>.启动消费者

[root@node106.yinzhengjie.org.cn ~]# kafka-console-consumer.sh --bootstrap-server node107.yinzhengjie.org.cn: --topic yinzhengjie-kafka2019
尹正杰到此一游!
https://www.cnblogs.com/yinzhengjie/
[root@node106.yinzhengjie.org.cn ~]# kafka-console-consumer.sh
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
Option Description
------ -----------
--blacklist <String: blacklist> Blacklist of topics to exclude from
consumption.
--bootstrap-server <String: server to REQUIRED (unless old consumer is
connect to> used): The server to connect to.
--consumer-property <String: A mechanism to pass user-defined
consumer_prop> properties in the form key=value to
the consumer.
--consumer.config <String: config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
--csv-reporter-enabled If set, the CSV metrics reporter will
be enabled
--delete-consumer-offsets If specified, the consumer path in
zookeeper is deleted when starting up
--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--key-deserializer <String:
deserializer for key>
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--metrics-dir <String: metrics If csv-reporter-enable is set, and
directory> this parameter isset, the csv
metrics will be outputed here
--new-consumer Use the new consumer implementation.
This is the default.
--offset <String: consume offset> The offset id to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
--partition <Integer: partition> The partition to consume from.
--property <String: prop> The properties to initialize the
message formatter.
--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
--topic <String: topic> The topic id to consume on.
--value-deserializer <String:
deserializer for values>
--whitelist <String: whitelist> Whitelist of topics to include for
consumption.
--zookeeper <String: urls> REQUIRED (only when using old
consumer): The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
[root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]#

[root@node106.yinzhengjie.org.cn ~]# kafka-console-consumer.sh      #消费者脚本帮助信息

四.其他操作

1>.Leadership均衡

  当broker关闭或故障时,该节点是leader的所有parition把leader迁移到其他副本的服务器上去。这意味着当broker重启后,其所有partition将只是follower,即不会被任何客户端用来读写。

  为避免这种不均衡,kafka有一个倾向副本(prefered replica)的概念。如果某个分区的列表是:,,。则108号节点比106和107更倾向于leader,因为它排在最前面。
  
  可以通过“kafka-preferred-replica-election.sh”让kafka集群尝试让倾向副本重新成为leader。该命令的具体用法下面我给处理具体的执行案例。   频繁允许该命令很无聊,你可以配置一下选项让kafka自动执行"auto.leader.rebalance.enable=true",很显然我们是不需要配置该值的,因为它默认就是true哟~
[root@node106.yinzhengjie.org.cn ~]# kafka-preferred-replica-election.sh --zookeeper node108.yinzhengjie.org.cn:/kafka01
Created preferred replica election path with {"version":,"partitions":[{"topic":"yinzhengjie-kafka2019","partition":},{"topic":"__consumer_offsets","partition":},{"topic":"__consumer_offsets","partition":},{"topic":"__c
onsumer_offsets","partition":27},{"topic":"yinzhengjie-kafka2019","partition":4},{"topic":"yinzhengjie-kafka","partition":15},{"topic":"__consumer_offsets","partition":1},{"topic":"__consumer_offsets","partition":20},{"topic":"__consumer_offsets","partition":7},{"topic":"__consumer_offsets","partition":42},{"topic":"yinzhengjie-kafka","partition":6},{"topic":"__consumer_offsets","partition":49},{"topic":"yinzhengjie-kafka2019","partition":11},{"topic":"test","partition":2},{"topic":"__consumer_offsets","partition":4},{"topic":"__consumer_offsets","partition":33},{"topic":"yinzhengjie-kafka001","partition":4},{"topic":"yinzhengjie-kafka2019","partition":13},{"topic":"__consumer_offsets","partition":14},{"topic":"__consumer_offsets","partition":46},{"topic":"__consumer_offsets","partition":24},{"topic":"__consumer_offsets","partition":28},{"topic":"yinzhengjie-kafka","partition":1},{"topic":"__consumer_offsets","partition":6},{"topic":"yinzhengjie-kafka2019","partition":7},{"topic":"yinzhengjie-kafka","partition":12},{"topic":"yinzhengjie-kafka","partition":16},{"topic":"__consumer_offsets","partition":37},{"topic":"yinzhengjie-kafka2019","partition":3},{"topic":"yinzhengjie-kafka001","partition":3},{"topic":"__consumer_offsets","partition":43},{"topic":"yinzhengjie-kafka001","partition":0},{"topic":"yinzhengjie-kafka","partition":11},{"topic":"test","partition":1},{"topic":"__consumer_offsets","partition":21},{"topic":"yinzhengjie-kafka2019","partition":0},{"topic":"yinzhengjie-kafka","partition":2},{"topic":"__consumer_offsets","partition":15},{"topic":"__consumer_offsets","partition":11},{"topic":"yinzhengjie-kafka","partition":0},{"topic":"yinzhengjie-kafka","partition":5},{"topic":"__consumer_offsets","partition":30},{"topic":"yinzhengjie-kafka001","partition":7},{"topic":"__consumer_offsets","partition":2},{"topic":"__consumer_offsets","partition":47},{"topic":"yinzhengjie-kafka2019","partition":17},{"topic":"__consumer_offsets","partition":25},{"topic":"__consumer_offsets","partition":29},{"topic":"kafka001","partition":2},{"topic":"yinzhengjie-kafka","partition":4},{"topic":"yinzhengjie-kafka2019","partition":16},{"topic":"yinzhengjie-kafka2019","partition":2},{"topic":"__consumer_offsets","partition":8},{"topic":"__consumer_offsets","partition":23},{"topic":"yinzhengjie-kafka2019","partition":12},{"topic":"yinzhengjie-kafka001","partition":8},{"topic":"test","partition":0},{"topic":"__consumer_offsets","partition":40},{"topic":"__consumer_offsets","partition":31},{"topic":"yinzhengjie-kafka001","partition":5},{"topic":"yinzhengjie-kafka001","partition":2},{"topic":"yinzhengjie-kafka","partition":8},{"topic":"__consumer_offsets","partition":19},{"topic":"__consumer_offsets","partition":16},{"topic":"kafka001","partition":1},{"topic":"__consumer_offsets","partition":38},{"topic":"yinzhengjie-kafka","partition":9},{"topic":"yinzhengjie-kafka","partition":13},{"topic":"__consumer_offsets","partition":44},{"topic":"__consumer_offsets","partition":10},{"topic":"yinzhengjie-kafka001","partition":1},{"topic":"__consumer_offsets","partition":3},{"topic":"yinzhengjie-kafka","partition":17},{"topic":"__consumer_offsets","partition":35},{"topic":"kafka001","partition":4},{"topic":"yinzhengjie-kafka2019","partition":1},{"topic":"yinzhengjie-kafka2019","partition":10},{"topic":"kafka001","partition":3},{"topic":"yinzhengjie-kafka2019","partition":6},{"topic":"yinzhengjie-kafka","partition":18},{"topic":"__consumer_offsets","partition":26},{"topic":"__consumer_offsets","partition":39},{"topic":"__consumer_offsets","partition":13},{"topic":"yinzhengjie-kafka","partition":10},{"topic":"__consumer_offsets","partition":17},{"topic":"yinzhengjie-kafka","partition":14},{"topic":"__consumer_offsets","partition":22},{"topic":"yinzhengjie-kafka2019","partition":8},{"topic":"yinzhengjie-kafka","partition":3},{"topic":"__consumer_offsets","partition":9},{"topic":"__consumer_offsets","partition":0},{"topic":"__consumer_offsets","partition":41},{"topic":"yinzhengjie-kafka2019","partition":18},{"topic":"yinzhengjie-kafka001","partition":6},{"topic":"yinzhengjie-kafka2019","partition":9},{"topic":"__consumer_offsets","partition":48},{"topic":"yinzhengjie-kafka2019","partition":14},{"topic":"yinzhengjie-kafka","partition":7},{"topic":"__consumer_offsets","partition":18},{"topic":"kafka001","partition":0},{"topic":"__consumer_offsets","partition":32},{"topic":"yinzhengjie-kafka2019","partition":15},{"topic":"yinzhengjie-kafka","partition":19},{"topic":"yinzhengjie-kafka2019","partition":19},{"topic":"__consumer_offsets","partition":12},{"topic":"yinzhengjie-kafka001","partition":9},{"topic":"__consumer_offsets","partition":45},{"topic":"__consumer_offsets","partition":5}]}Successfully started preferred replica election for partitions Set([yinzhengjie-kafka,6], [__consumer_offsets,32], [__consumer_offsets,16], [__consumer_offsets,49], [__consumer_offsets,44], [yinzhengjie-kafka001,1], [yinzheng
jie-kafka2019,], [test,], [__consumer_offsets,], [yinzhengjie-kafka,], [yinzhengjie-kafka,], [kafka001,], [yinzhengjie-kafka2019,], [__consumer_offsets,], [yinzhengjie-kafka,], [yinzhengjie-kafka2019,], [__consumer_offsets,], [yinzhengjie-kafka,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [__consumer_offsets,], [yinzhengjie-kafka,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [__consumer_offsets,], [yinzhengjie-kafka,], [__consumer_offsets,], [__consumer_offsets,], [__consumer_offsets,], [kafka001,], [__consumer_offsets,], [kafka001,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [yinzhengjie-kafka001,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [yinzhengjie-kafka2019,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [test,], [__consumer_offsets,], [__consumer_offsets,], [yinzhengjie-kafka,], [yinzhengjie-kafka,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [yinzhengjie-kafka001,], [__consumer_offsets,], [__consumer_offsets,], [__consumer_offsets,], [yinzhengjie-kafka,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [yinzhengjie-kafka001,], [__consumer_offsets,], [yinzhengjie-kafka001,], [yinzhengjie-kafka,], [yinzhengjie-kafka,], [yinzhengjie-kafka,], [yinzhengjie-kafka2019,], [test,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [yinzhengjie-kafka,], [yinzhengjie-kafka,], [__consumer_offsets,], [__consumer_offsets,], [yinzhengjie-kafka001,], [__consumer_offsets,], [kafka001,], [__consumer_offsets,], [__consumer_offsets,], [yinzhengjie-kafka001,], [yinzhengjie-kafka2019,], [__consumer_offsets,], [__consumer_offsets,], [__consumer_offsets,], [__consumer_offsets,], [__consumer_offsets,], [yinzhengjie-kafka001,], [__consumer_offsets,], [yinzhengjie-kafka,], [yinzhengjie-kafka001,], [__consumer_offsets,], [yinzhengjie-kafka001,], [__consumer_offsets,], [__consumer_offsets,], [__consumer_offsets,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [__consumer_offsets,], [yinzhengjie-kafka,], [yinzhengjie-kafka2019,], [yinzhengjie-kafka,], [yinzhengjie-kafka2019,], [kafka001,], [yinzhengjie-kafka,], [__consumer_offsets,], [__consumer_offsets,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [yinzhengjie-kafka,], [__consumer_offsets,], [yinzhengjie-kafka2019,], [__consumer_offsets,])[root@node106.yinzhengjie.org.cn ~]#

[root@node106.yinzhengjie.org.cn ~]# kafka-preferred-replica-election.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01

2>.集群间数据镜像

  不同Kafka集群间数据的复制称为“镜像”。Kafka内置的跨集群复制工具叫作MirrorMaker。该工具从源集群读取数据并写入到目标集群。典型的用例有旧数据迁移,跨数据中心备份。

  可以运行多个镜像进程,以增加吞吐量和容错。

  数据会从源集群的topic中读取,以同样的topic写入目标集群。事实上MirrorMaker相当于一个Kafka consumer和producer串接。   源和目标集群是完全独立的:它们可以有不同的分区数和offset。因此镜像集群不是源集群中的一种容错机制,因为consumer和offset是不一样的。   集群的容错建议使用集群内的副本机制。但MirrorMaker会保持并使用分区键,因此在键层面是保序的。   MirrorMaker是高度配置的。首先,它使用了一个生产者和多个消费者,所以产生和消费者的相关配置参数都可以用于配置MirrorMaker。另外,MirrorMaker本身也有一些配置参数,这些配置参数之间有时候会比较复杂的依赖关系。   先来看一个MirrorMaker的例子,我们着重说明一些重要的配置参数:

      [root@node106.yinzhengjie.org.cn ~]# kafka-mirror-maker.sh --consumer.config /home/softwares/kafka_2.11-0.10.2.1/config/consumer.properties --producer.config /home/softwares/kafka_2.11-0.10.2.1/config/producer.properties --new.consumer -num.streams=2 --whitelist ".*"

  consumer.config:
    改参数用于指定消费者的配置文件。所有的消费者都将共用这个配置,也就是说,只能配置一个源集群和一个group.id。所有的消费者属于同一个消费者群组,这正好与我们的要求不谋而合。配置文件里面有两个必选的参数:bootstrap.servers(源集群的服务器地址)和group.id。除了这两个参数外,还可以为消费者指定其他任意的配置参数。auto.commit.enable参数一般不需要修改,用默认值false就行。MirrorMaker会在消息安全到达目标集群之后提交偏移量,所以不能使用自动提交。如果修改了这个参数,也就是说,MirrorMaker只会对那些在MirrorMaker启动之后到达源集群的数据进行镜像。如果想要之前的数据,需要把该参数设置为earliest。
  producer.config:
    改参数用于指定生产者的配置文件。配置文件里唯一必须的参数是bootstrap.servers(目标集群的服务器地址)。
  new.consumer:
    MirrorMaker只能使用0.8版本或者0.9版本的消费者,建议使用0.9版本的消费者。因为它更加稳定。
  num.strams:
    之前已经解释过,一个流就是一个消费者。所有的消费者共用一个生产者,MirrorMaker将会使用这些流来填充同一个生产者。如果需要额外的吞吐量,就需要创建另一个MirrorMaker进程。
  witelist:
    这是一个正则表达式,代表了需要进行镜像的主题名字。所有表达式匹配的主题都将被镜像。在这个例子里,我们希望镜像所有的topic,不过在实际当中最好使用类似“ad.*”这样的表达式,避免镜像测试用的主题。 当然我们也可以","或者"|"来指定多个topic名称,比如:"data.ad|adta.op|data.er",如果只有部分topic不要采集,可以使用--blacklist参数。

3>.集群退役和服役(建议先安装kafka-manager或者Kafka Eagle先关监控工具

  将新节点加入Kafka集群很容易,只需要指定一个唯一的broker id并启动Kafka进程即可。但新服务器不会自动获取任何分区,除非将现有分区移动到新服务器上。否则直到创建新的topic,该服务器都不会起作用。

  分区重分布工具不能为退役节点自动生成重分布计划,但可以将退役节点容纳的paritition分配到其他节点即可,这可以通过自动生成计划实现,也可以手动编写计划,只不过略有一点麻烦。
  
  kafka提供了一个分区重分布工具kafka-reassign-partitions.sh可以完成上述工作。该工具可以自动生成重分布计划,也可以指手工指定,但该工具不能自动检测到分区的分布的不均衡,因此仍然需要管理员触发该工具。   该工具可以运行于三种互斥的模式下:
    (1)generate :
        该模式下,指定一些topic和一些broker,工具可以自动生成重分布计划以将这些topic分布到这些broker上去。这种模式提供了一种建议的方法来生成重分布计划(但需要谨慎使用,可能会造成很大的数据移动)。
    (2)execcute:
        该模式下,工具使用用户提供的重新分布计划来执行充分布。该重分布几乎可以是手工编辑的,也可以是"--generate"模式生成的。
    (3)verify:
        该模式下,工具检验上一次"--execute"执行的重分布几乎的状态。状态可能是successfully complated,failed或in progress。
  详情请参考:https://www.cnblogs.com/yinzhengjie/p/9808125.html

4>.Kafka Offset相关命令总结

  详情请参考:https://www.cnblogs.com/yinzhengjie/p/10514206.html

5>.设置配额(所谓配额就是客户端进行读写的流量上线)

  可以在broker配置层面上设置配额,会对所有客户端生效。默认所有客户端都是用无限制的配额。以下配置可将producer和consumer的配额设为10MB/s:
    quta.producer.default=
    quta.consumer.default=
  也可以对某客户端单独设置配额(用的少,了解即可):   
    [root@node106.yinzhengjie.org.cn ~]# kafka-configs.sh --zookeeper node108.yinzhengjie.org.cn: --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-name ClientA --entity-type clients

6>.kafka重要监控指标(摘自《Kafka权威指南》中的"broker的度量指标")

  Kafka使用JMX输出server和client的各项指标,可以使用jconsole等工具展示指标。这里我们只罗列server端可以看到的各项指标及其JMX MBean名称。我们会去到对应的度量指标后,可以使用zabbix的java gateway插件来监控kafka,当然也可以使用open falcon之类的工具进行监控。

Hadoop生态圈-Kafka常用命令总结

上一篇:HDU 5446 Unknown Treasure Lucas+中国剩余定理


下一篇:微服务实战(三):深入微服务架构的进程间通信 - DockOne.io