ELK日志服务使用-kafka传输日志(bbotte.com)

对于日志传输,rsyslog或者logstash也就够用了,一般的redis,kafka,主要是作为缓冲或序列防止logstash挂掉后,日志中断,下面写kafka的使用。


在用kafka之前,需要rsyslog模块支持,所以需要对rsyslog升级,并且启用omkafka,下面对rsyslog升级同时加入omhiredis:
下载https://github.com/rsyslog/rsyslog/archive/master.zip

rsyslog编译安装过程中会遇到种种问题,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
checking for LIBLOGGING_STDLOG... no
configure: error: Package requirements (liblogging-stdlog >= 1.0.3) were not met:
No package 'liblogging-stdlog' found
Consider adjusting the PKG_CONFIG_PATH environment variable if you
installed software in a non-standard prefix.
Alternatively, you may set the environment variables LIBLOGGING_STDLOG_CFLAGS
and LIBLOGGING_STDLOG_LIBS to avoid the need to call pkg-config
checking for HIREDIS... configure: error: Package requirements (hiredis >= 0.10.1) were not met:
No package 'hiredis' found
Consider adjusting the PKG_CONFIG_PATH environment variable if you
installed software in a non-standard prefix.
Alternatively, you may set the environment variables HIREDIS_CFLAGS
and HIREDIS_LIBS to avoid the need to call pkg-config.
See the pkg-config man page for more details.


下面为rsyslog安装升级步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# yum install autoconf liblogging-devel
# rpm -ivh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm 
# yum -y install hiredis hiredis-devel libgt-devel libksi-devel librelp-devel liblognorm1-devel liblognorm1-devel liblognorm1-devel libmongo-client-devel adiscon-librdkafka-devel libnet-devel
# yum -y install pkgconfig libuuid* libgcrypt-devel zlib-devel byacc mysql-devel
# wget http://libestr.adiscon.com/files/download/libestr-0.1.9.tar.gz 
# tar xvf libestr-0.1.9.tar.gz 
# cd libestr
# ./configure && make && make install
# wget https://github.com/downloads/json-c/json-c/json-c-0.10.tar.gz
# tar xvf json-c-0.10.tar.gz
# cd json-c
# ./configure && make && make install 
# cp ../json-c-0.10/json_object_iterator.h /usr/local/include/json/
# wget https://pypi.python.org/packages/source/d/docutils/docutils-0.11.tar.gz --no-check-certificate 
# python setup.py install 
# ln -s /usr/bin/rst2man.py /usr/bin/rst2man
wget https://github.com/redis/hiredis/archive/master.zip
cd ../hiredis-master
make
Generating hiredis.pc for pkgconfig... 生成了hiredis.pc文件
# cat /usr/lib64/pkgconfig/hiredis.pc 
prefix=/usr/local
exec_prefix=${prefix}
libdir=/usr/local/lib
includedir=/usr/local/include/hiredis
Name: hiredis
Description: Minimalistic C client library for Redis.
Version: 0.13.3
Libs: -L${libdir} -lhiredis
Cflags: -I${includedir} -D_FILE_OFFSET_BITS=64
cp hiredis.pc /usr/lib64/pkgconfig/
./configure --enable-omkafka --enable-omhiredis PKG_CONFIG_PATH=/usr/lib64/pkgconfig --libdir=/usr/lib64
或者更详细的编译操作
./configure --build=x86_64-redhat-linux-gnu --host=x86_64-redhat-linux-gnu --target=x86_64-redhat-linux-gnu \
--program-prefix= --prefix=/usr --exec-prefix= --bindir=/bin --sbindir=/sbin --sysconfdir=/etc \
--datadir=/usr/share --includedir=/usr/include --libdir=/lib64 --libexecdir=/libexec --localstatedir=/var \
--sharedstatedir=/var/lib --mandir=/usr/share/man --infodir=/usr/share/info --disable-static \
--disable-testbench --enable-uuid --enable-elasticsearch --enable-ommongodb --enable-omkafka \
--enable-usertools --enable-gnutls --enable-imfile --enable-impstats --enable-imptcp --enable-libdbi \
--enable-mail --enable-mysql --enable-omprog --enable-omudpspoof --enable-omuxsock --enable-pgsql \
--enable-pmlastmsg --enable-relp --enable-snmp --enable-unlimited-select --enable-mmjsonparse \
--enable-mmnormalize --enable-mmanon --enable-mmutf8fix --enable-mail --enable-mmfields \
--enable-mmpstrucdata --enable-mmsequence --enable-pmaixforwardedfrom --enable-pmciscoios \
--enable-guardtime --enable-omhiredis --enable-omhttpfs
---{ database support }---
MySql support enabled: no
libdbi support enabled: no
PostgreSQL support enabled: no
mongodb support enabled: no
hiredis support enabled: yes
---{ output plugins }---
Mail support enabled: no
omkafka module will be compiled: yes
omprog module will be compiled: no
omstdout module will be compiled: no
omjournal module will be compiled: no
omhdfs module will be compiled: no
omelasticsearch module will be compiled: no
make && make install

#kafka的rsyslog模块http://bbotte.com/
ls /usr/lib64/rsyslog/omkafka.so
# ls /usr/lib64/rsyslog


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ln -s /usr/lib64/rsyslog/omhiredis.so /lib64/rsyslog/omhiredis.so
ln -s /usr/lib64/rsyslog/omkafka.so /lib64/rsyslog/omkafka.so
service rsyslog restart
# rsyslogd -v
rsyslogd 8.15.0, compiled with:
PLATFORM: x86_64-redhat-linux-gnu
PLATFORM (lsb_release -d): 
FEATURE_REGEXP: Yes
GSSAPI Kerberos 5 support: No
FEATURE_DEBUG (debug build, slow code): No
32bit Atomic operations supported: Yes
64bit Atomic operations supported: Yes
memory allocator: system default
Runtime Instrumentation (slow code): No
uuid support: Yes
Number of Bits in RainerScript integers: 64
See http://www.rsyslog.com for more information.

发送日志服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# egrep -v "^$|^#" /etc/rsyslog.conf 
module(load="imuxsock"# provides support for local system logging (e.g. via logger command)
module(load="imklog")   # provides kernel logging support (previously done by rklogd)
module(load="imudp"# needs to be done just once
input(type="imudp" port="1514")
module(load="imfile")
module(load="omhiredis"# lets you send to Redis
module(load="omkafka")   # lets you send to Kafka
module(load="imtcp"# needs to be done just once
input(type="imtcp" port="514")
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat
$IncludeConfig /etc/rsyslog.d/*.conf
*.info;mail.none;authpriv.none;cron.none                /var/log/messages
authpriv.*                                              /var/log/secure
mail.*                                                  /var/log/maillog
cron.*                                                  /var/log/cron
*.emerg                                                 :omusrmsg:*
uucp,news.crit                                          /var/log/spooler
local7.*                                                /var/log/boot.log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# egrep -v "^$|^#" /etc/rsyslog.d/kafka.conf 
input(type="imfile"
  file="/tmp/example.log"
  tag="examplelogs"
  facility="local8"
)
template(name="json_lines" type="list" option.json="on") {
  constant(value="{")
  constant(value="\"timestamp\":\"")
  property(name="timereported" dateFormat="rfc3339")
  constant(value="\",\"message\":\"")
  property(name="msg")
  constant(value="\",\"host\":\"")
  property(name="hostname")
  constant(value="\",\"severity\":\"")
  property(name="syslogseverity-text")
  constant(value="\",\"facility\":\"")
  property(name="syslogfacility-text")
  constant(value="\",\"syslog-tag\":\"")
  property(name="syslogtag")
  constant(value="\"}")
}
main_queue(
  queue.workerthreads="1"      # threads to work on the queue
  queue.dequeueBatchSize="100" # max number of messages to process at once
  queue.size="10000"           # max queue size
)
action(
  broker=["192.168.71.37:9092"]
  type="omkafka"
  topic="kafka1"
  template="json_lines"
)
if facility="local8" then ~


kafka的设置:

1
2
3
4
5
6
kafka帮助文档 https://kafka.apache.org/documentation.html#quickstart
# tar -xzf kafka_2.10-0.9.0.0.tgz
# cd /usr/local/kafka_2.10-0.9.0.0/
# ./bin/zookeeper-server-start.sh config/zookeeper.properties &
# ./bin/kafka-server-start.sh config/server.properties &
# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic kafka1 &

接收日志服务器:

1
2
3
# cd /usr/local/kafka_2.10-0.9.0.0/
# ./bin/zookeeper-server-start.sh config/zookeeper.properties &
# ./bin/kafka-server-start.sh config/server.properties &


发送方写入一条日志:

1
2
# vim /tmp/example.log
2016-02-26 16:30:29,043 (OrderInvestmentServiceImpl.java:229) INFO [DubboServerHandler-10.117.30.32:20886-thread-182] 201602261630

从两个方面验证日志的传输

1
2
# ./bin/kafka-console-consumer.sh --zookeeper 192.168.71.37:2181 --from-beginning --topic kafka1
{"timestamp":"2016-02-26T17:23:00.101552+08:00","message":"2016-02-26 16:30:29,043 (OrderInvestmentServiceImpl.java:229) INFO [DubboServerHandler-10.117.30.32:20886-thread-182] 201602261630","host":"localhost","severity":"debug","facility":"invld","syslog-tag":"examplelogs"}
1
2
3
4
5
6
7
8
9
10
11
# /opt/logstash/bin/logstash -f kafka.conf -v
{
     "timestamp" => "2016-02-26T17:23:00.101552+08:00",
       "message" => "2016-02-26 16:30:29,043 (OrderInvestmentServiceImpl.java:229) INFO [DubboServerHandler-10.117.30.32:20886-thread-182] 201602261630",
          "host" => "localhost",
      "severity" => "debug",
      "facility" => "invld",
    "syslog-tag" => "examplelogs",
      "@version" => "1",
    "@timestamp" => "2016-02-26T09:23:00.430Z"
}
1
2
3
4
5
6
7
8
9
10
11
12
# vim kafka.conf 
input {
  kafka {
    zk_connect => "192.168.71.37:2181"
    topic_id => "kafka1"
  }
}
output {
    stdout {
        codec=>"rubydebug"
    }
}









本文转自 bbotte 51CTO博客,原文链接:http://blog.51cto.com/bbotte/1747143,如需转载请自行联系原作者
上一篇:Mina框架IoSession详解


下一篇:MIna框架I/O Filter Chain层设计