elk 加 Kafka


在搭建kafka集群时,需要提前安装zookeeper集群,当然kafka已经自带zookeeper程序只需要解压并且安装配置就行了
官网: http://kafka.apache.org
yum install -y java-1.8.0
wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/
cd /usr/local/
mv kafka_2.11-0.8.2.1 kafka

2.配置zookeeper集群
vim /usr/local/kafka/config/zookeeper.properties

dataDir=/data/zookeeper
clientPort=2181
tickTime=2000
tickTime : 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
initLimit=20
initLimit:LF初始通信时限
syncLimit=10
syncLimit:LF同步通信时限
server.2=192.168.184.177:2888:3888
2888 端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
3888 端口:表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader ,而这个端口就是用来执行选举时服务器相互通信的端口。
server.3=192.168.184.178:2888:3888
server.4=192.168.184.179:2888:3888
maxClientCnxns=0
maxClientCnxns选项,如果不设置或者设置为0,则每个ip连接zookeeper时的连接数没有限制

mkdir   -p  /data/zookeeper
echo 2 > /data/zookeeper/myid    

 

vim /usr/local/kafka/config/server.properties

broker.id=2
唯一,填数字
prot=9092
这个 broker 监听的端口
host.name=192.168.184.177
唯一,填服务器 IP
log.dir=/data/kafka-logs
该目录可以不用提前创建,在启动时自己会创建
zookeeper.connect=192.168.184.177:2181,192.168.184.178:2181,192.168.184.179:2181
这个就是 zookeeper 的 ip 及端口
num.partitions=16
需要配置较大 分片影响读写速度
log.dirs=/data/kafka-logs
数据目录也要单独配置磁盘较大的地方
log.retention.hours=168
 时间按需求保留过期时间 避免磁盘满

另外两台zookeeper和kafka需要修改的地方
mkdir /data/zookeeper -p
echo "x" >    /data/zookeeper/myid   x为3或4
vim /usr/local/kafka/config/server.properties
broker.id=3/4
host.name=192.168.184.178/179

先启动zookeeper再启动kafka
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
netstat -nlpt | grep -E "2181|2888|3888"

nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
使用另外一台logstash发文件给kafka
vim /etc/logstash/conf.d/log1.conf

input {
file {
type => "system-message"
path => "/var/log/messages"
start_position => "beginning"
}
}
output {
kafka {
bootstrap_servers => "192.168.184.177:9092,192.168.184.178:9092,192.168.184.179:9092"
topic_id => "system-messages"    #这个将作为主题的名称,将会自动创建
compression_type => "snappy"    #压缩类型
}
}

/opt/logstash/bin/logstash -f /etc/logstash/conf.d/log1.conf --configtest --verbose
Configuration OK
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/log1.conf

检查Kafka是否写入数据
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.184.177:2181
summer
system - messages

/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.184.177:2181 --topic system-messages

Kafka集群部署logstash,三台同时配置
vim /etc/yum.repos.d/logstash.repo

[logstash-2.1]
name=Logstash repository for 2.1.x packages
baseurl=http://packages.elastic.co/logstash/2.1/centos
gpgcheck=0
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

yum install -y logstash

vim /etc/logstash/conf.d/logstash1.conf

input {
kafka {
zk_connect => "192.168.184.177:2181,192.168.184.178:2181,192.168.184.179:2181"    #消费者们
topic_id => "system-messages"
codec => plain
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
output {
elasticsearch {
hosts => ["192.168.184.174:9200","192.168.184.175:9200"]
index => "test-system-messages-%{+YYYY-MM}" 
}
}

/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash1.conf

查看es管理页面,配置完成。

消息系统主要功能:
1.解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
2.冗余
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。
许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3.扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
4.灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。
使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5.可恢复性
系统的一部分组件失效时,不会影响到整个系统。
消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
6.顺序保证
在大多使用场景下,数据处理的顺序都很重要。
大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
7.缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
8.异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。
想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

Kafka基于zookeeper集群
1,生产服务器生产日志。
2,logstash读入,输出到消息系统。
3,收到生产服务器上的日志后生产主题topic
4,logstash在本地集群上面消费然后再输出到es集群


 

上一篇:[terry笔记]对人员列表文件进行数据库操作


下一篇:如何在kibana上查看ES的数据