KafkaMirror Maker架设笔记

有幸使用Kafka Mirror Maker来同步两个地方的数据,以下的是个人笔记记录(未完)。

开启JMX

使用JMX监控Kafka的敏感数据需要开启JMX端口,可以通过设置环境变量来实现。我自己的做法是在kafka-server-start.sh脚本文件中添加如下配置

...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
    # 增加JMX_PORT配置
    export JMX_PORT="9999"
...

消费配置

# 消费目标集群
bootstrap.servers=<kafka1host_from>:<port>,<kafka2host_from>:<port>,<kafka3host_from>:<port>
# 消费组的ID
group.id=mm-test-consumer-group
# 选取镜像数据的起始?即镜像MirrorMaker启动后的数据,参数latest,还是镜像之前的数据,参数earliest
auto.offset.reset=earliest
# 消费者心跳数据,默认3000,由于是远程镜像,此处设为30秒
heartbeat.interval.ms=30000
# 消费连接超时值,默认10000,由于远程镜像,此处设为100秒
session.timeout.ms=100000
# 更改分区策略,默认是range,虽然有一定优势但会导致不公平现象,特别是镜像大量的主题和分区的时候
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
# 单个poll()执行的最大record数,默认是500
max.poll.records=20000
# 读数据时tcp接收缓冲区大小,默认是65536(64KiB)
receive.buffer.bytes=4194304
# 设置每个分区总的大小,默认是1048576
max.partition.fetch.bytes=10485760

生产者配置

# 生产者目的集群
bootstrap.servers=<kafka1host_to>:<port>,<kafka2host_to>:<port>,<kafka3host_to>:<port>
# 开启压缩
compression.type=snappy

执行命令

nohup bin/kafka-mirror-maker --consumer.config config/consumer.properties --num.streams 2 --producer.config config/producer.properties --whitelist '*.test|*.temp' >/var/log/mirrormaker/mirrormaker.log 2>&1 & 

说明:

  1. --num.streams: 一个流就是一个消费者,所有消费者公用一个生产者。
  2. --whitelist: 表明需要同步的白名单,可以使用'|'来连接多个topic,还可以使用java风格的正则表达式,与此对应的就是黑名单。
  3. 对于日志记录文件建议放到/var/log/文件夹中,并使用logrotate进行周期管理,达到日志可查,又避免日志文件填满整个磁盘。
  4. Mirror Maker一般部署与目的集群,即一个可靠的生产者(向目的集群输入数据)更为重要,一个无法连接到集群的消费者要比一个无法连接到集群的生产者要安全得多。

保持数据不丢失的配置

对于consumer

enable.auto.commit=false

对于producer

max.in.flight.requests.per.connection=1
retries=Integer.MAX_VALUE

max.block.ms=Long.MAX_VALUE

对于MirrorMaker

增加参数

--abort.on.send.failure

对于MirrorMaker的效果

  • Mirror maker will only send one request to a broker at any given point.
  • If any exception is caught in mirror maker thread, mirror maker will try to commit the acked offsets then exit immediately.
  • For RetriableException in producer, producer will retry indefinitely. If retry did not work, eventually the entire mirror maker will block on producer buffer full.
  • For None-retriable exception, if --abort.on.send.fail is specified, stops the mirror maker. Otherwise producer callback will record the message that was not successfully sent but let the mirror maker move on. In this case, that message will be lost in target cluster.

As the last point stated if there is any error occurred your mirror maker process will be killed. So users are recommend to use a watchdog process like supervisord to restart the killed mirrormaker process.

各问题汇总

Java版本切换

当安装有多个java版本时,有时调试时需要在多个java版本间进行切换,可以使用命令update-alternatives来实现,具体为

# update-alternatives --config java
...
# update-alternatives --config javac

然后根据列出的java版本选择你所需要的。

kafka不能消费数据

进行kafka mirror maker操作后,目的集群不能消费数据(除了kafka自身的kafka-console-consumer

上一篇:Bit-Z×BitUniverse网格交易大赛


下一篇:“贷券” 信贷系统