使用filebeat 收集日志到logstash 收集日志fakfa再到logstash到es

大型场合的工作流程图

filebeat -->logstash ---> fakfa ---> logstash --->es

工作环境:
需要两台logstash,

172.31.2.101 es1 + kibana
172.31.2.102 es2
172.31.2.103 es3

172.31.2.105 logstash2
172.31.2.107 web1 + filebeat + logstash1
172.31.2.41 zookeeper + kafka
172.31.2.42 zookeeper + kafka
172.31.2.43 zookeeper + kafka

先启动zookeeper

[root@mq1 ~]# /usr/local/zookeeper/bin/zkServer.sh restart
[root@mq2 ~]# /usr/local/zookeeper/bin/zkServer.sh restart
[root@mq3 ~]# /usr/local/zookeeper/bin/zkServer.sh restart

启动kafka

[root@mq1 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties

[root@mq2 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties

[root@mq3 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties

安装jdk8

[root@es-web1]# apt install openjdk-8-jdk -y

上传deb包,安装

[root@es-web1 src]# dpkg -i logstash-7.12.1-amd64.deb

上传deb包,dpkg安装filebeat

[root@es-web1 src]# dpkg -i filebeat-7.12.1-amd64.deb

配置filebeat

[root@es-web1]# vim /etc/filebeat/filebeat.yml

- type: log
  enabled: True
  paths:
    - /apps/nginx/logs/error.log
  fields:
    app: nginx-errorlog
    group: n223

- type: log
  enabled: True
  paths:
    - /var/log/nginx/access.log
  fields:
    app: nginx-accesslog
    group: n125

output.logstash:
  hosts: ["172.31.2.107:5044","172.31.2.107:5045"]
  enabled: true
  worker: 1
  compression_level: 3
  loadbalance: true

重启

[root@es-web1]# systemctl restart filebeat

配置logstash1

[root@es-web1]# vim /etc/logstash/conf.d/beats.conf

input {
  beats {
    port => 5044
    host => "172.31.2.107"
    codec => "json"
  }
  
  beats {
    port => 5045
    host => "172.31.2.107"
    codec => "json"
  }
}

output {
   if [fields][app] == "nginx-errorlog" {
      kafka {
        bootstrap_servers =>"172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092"
        topic_id => "nginx-errorlog-kafka"
        codec => "json"    
   }}
   
   if [fields][app] == "nginx-accesslog" {
      kafka{
        bootstrap_servers =>"172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092"
        topic_id => "nginx-accesslog-kafka"
        codec => "json"    
   }}
}

语法检查

[root@es-web1]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx-log-es.conf -t

重启

[root@es-web1]# systemctl restart logstash

刷新或者添加数据

[root@es-web1 ~]# echo "error 2222" >> /apps/nginx/logs/error.log
[root@es-web1 ~]# echo "error 1111" >> /apps/nginx/logs/error.log

[root@es-web1 ~]# echo "web111" >> /var/log/nginx/access.log
[root@es-web1 ~]# echo "web112" >> /var/log/nginx/access.log
[root@es-web1 ~]# echo "web222" >> /var/log/nginx/access.log

kafka工具

使用filebeat 收集日志到logstash 收集日志fakfa再到logstash到es

配置logstash2

[root@logstash2 ~]# cat /etc/logstash/conf.d/mubeats.conf

input {
  kafka {
    bootstrap_servers => "172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092"
    topics => ["nginx-errorlog-kafka","nginx-accesslog-kafka"]
    codec => "json"
  }
}

output {
  if [fields][app] == "nginx-errorlog" {
     elasticsearch {
        hosts => ["172.31.2.101:9200","172.31.2.102:9200","172.31.2.103:9200"]
        index => "logstash-kafka-nginx-errorlog-%{+YYYY.MM.dd}"
  }}

  if [fields][app] == "nginx-accesslog" {
     elasticsearch {
        hosts => ["172.31.2.101:9200","172.31.2.102:9200","172.31.2.103:9200"]
        index => "logstash-kafka-nginx-accesslog-%{+YYYY.MM.dd}"
  }}
}

重启

[root@es-logstash2]# systemctl restart logstash

添加到kibana

使用filebeat 收集日志到logstash 收集日志fakfa再到logstash到es

使用filebeat 收集日志到logstash 收集日志fakfa再到logstash到es

上一篇:使用filebeat 收集日志到logstash 收集日志redis再到logstash到es


下一篇:日志收集-Elk6