一周一个中间件-kafka

前言

Apache Kafka 最早是由 LinkedIn 开源出来的分布式消息系统,现在是 Apache 旗下的一个子项目,并且已经成为开源领域应用最广泛的消息系统之一。尤其是做日志中间件。

  • Kafka是一个分布式系统,易于向外扩展,扩展和容灾。
  • 它同时为发布和订阅提供高吞吐量。
  • 它支持多订阅者,当失败时能自动平衡消费者。
  • 消息的持久化。
  • 分区下顺序消息。

背景

我们公司迁移cloud,并且做了一个ELK日志系统。但是初始版本是以logstash中间件来检索并解析日志文件发送到es搜索引擎,然后再有Kibana中间件来展示es日志数据。但是因为初始版本中logstash需要安装在每个生产服务器中,他需要解析每个服务器的日志文件,而logstash会占用很多生产资源。所以需要改版这样的ELK日志系统,将应用服务的日志推送给kafka消息系统,然后由logstash去消费kafka的消息,发送给你es搜索引擎。

知识准备

  • Topic

Topic是用于存储消息的逻辑概念,可以看出是一个消息集合。每个Topic可以有多个生产者向其中推送消息,也可以有任意多个消费者消费其中的消息。

  • Partition分区

每个Topic都有多个分区,每个分区都有多个副本(Replica),这样可以实现消息的冗余备份。每个Rartition分区所存储的消息都是不同的,类似数据的水平切分的思想,可以提高并发读写的能力,这就是前面提到了的kafka易于向外扩展。可以通过提供分区数量,可以实现水平扩展。

  • Replica副本

同一个分区的不同副本保存相同的数据,副本之间是一主多从,Leader和Follower角色。Leader副本负责读写请求,Folower只与Leader副本消息同步。Leader副本出现故障后,则从Follower副本中选举Leader副本对外提供服务。可以通过提高副本数量,可以提高容灾能力。

  • 顺序保存

Kafka可以保证一个Partition内的消息有序性,并不能保证多个Partition之间数据的有序性。

参考文档

Kafka Manager下载 提取码:r4p1
Kafka Manager部署和使用
kafka manager页面参数说明

kafka搭建

修改server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
#host.name=47.105.153.163
#advertised.host.name=dl.zhang3
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://0.0.0.0:9092
#listeners=PLAINTEXT://47.105.153.163:9092
advertised.listeners=PLAINTEXT://47.105.153.163:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/root/kafka_2.11-2.3.0/kafkalog
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=47.104.87.10:2181,47.105.153.163:2183,47.92.146.75:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
delete.topic.enable=true

kafka Manager

安装kafka Manager: 下载kafka Manager weget https://github.com/yahoo/kafka-manager/archive/1.3.3.18.tar.gz.安装并解压后,安装sbt。yum install sbt,然后执行命令 ./sbt clean dist,这就是编译Kafak Manager.将编译好的kafka-manager.zip文件.一般是在/usr/local/kafka-manager-1.3.3.18/target/universal/文件夹中。将其copy到你的指定文件件内,解压zip文件,unzip kafka-manager-1.3.3.18.zip.
修改配置:配置zookeeper地址,vim application.conf,将原先的配置kafka-manager.zkhosts="localhost:2181",其中localhost改成自己ip.
启动kafka-manager.进入bin文件内,nohup ./kafka-manager &

kafka基础

作为消息中间件kafka,我们可以向kafka的Topic发送消息,剩余kafka自行处理。如果kafka集群有几个节点(Broker)就会有几个分片,分区在逻辑上对应的一个Log,这个Log对应磁盘上的一个文件夹,这个又由多个Segment组成,每个Segment对应一个日志文件和索引文件。这些日志文件采用保留策略和日志压缩。保留策略有日志消息保留时间,还有Topic存储数据大小。还有日志压缩,kafka后台有一个线程,定期将相同的key消息进行合并,最保留最新的value的值。

上一篇:RocketMQ 源码分析 —— Message 存储


下一篇:组网技术 | 链路技术Smart Link