ELK集群之kafka(7)

原理待补充:

kafka依赖于zookeeper集群。

都是基于java 由于源码安装jdk 未声明bin下java 在各自server配置文件中声明

JAVA_HOME=/usr/local/jdk1.8.0_241

 

引入kafka缓存日志之ZK搭建

ELK集群之kafka(7)
之前架构
Filebeat(多台)  -> Logstash(正则) -> Elasticsearch(入库) -> Kibana展现

架构优化,流行的架构
Filebeat(多台)  ->  Kafka(或Redis) ->     Logstash(正则多台) -> Elasticsearch(入库) -> Kibana展现

Kafka服务器的搭建
  Kafka依赖于Zookeeper
  依赖于Java环境

Kafka依赖于Zookeeper
官方网站:https://zookeeper.apache.org/
下载ZK的二进制包
解压到对应目录完成安装/usr/local/zookeeper
JAVA_HOME="/usr/local/jdk1.8.0_241"

Java环境安装
  yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel
  java -version

数据目录准备
  mkdir -pv /usr/local/zookeeper/data

zk配置
  cd /usr/local/zookeeper/conf/
  cp zoo_sample.cfg zoo.cfg

更改配置/usr/local/zookeeper/conf/zoo.cfg
dataDir=/usr/local/zookeeper/data
autopurge.snapRetainCount=3
autopurge.purgeInterval=1

zk使用systemctl管理/usr/lib/systemd/system/zookeeper.service
[Unit]
Description=zookeeper
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
User=root
[Install]
WantedBy=multi-user.target

启动zk
  systemctl enable zookeeper
  systemctl restart zookeeper

启动zk集群查看状态
./zkServer.sh start
./zkServer.sh status
zk单实例
ELK集群之kafka(7)
        引入kafka缓存日志之ZK集群搭建
集群配置/usr/local/zookeeper/conf/zoo.cfg
server.1=192.168.238.90:2888:3888
server.2=192.168.238.92:2888:3888
server.3=192.168.238.94:2888:3888

更改zk集群的id
  /usr/local/zookeeper/data/myid
  分别为1 2 3 

zk使用systemctl管理/usr/lib/systemd/system/zookeeper.service
[Unit]
Description=zookeeper
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
User=root
[Install]
WantedBy=multi-user.target

启动zk
  systemctl enable zookeeper
  systemctl restart zookeeper

启动zk集群查看状态
./zkServer.sh start
./zkServer.sh status

验证zk集群,创建一个节点,验证
./zkCli.sh
create /sjg
create /sjg/sjg
zk集群部署

 

引入kafka缓存日志之Kafka集群搭建

ELK集群之kafka(7)
Kafka下载地址
Kafka官网:http://kafka.apache.org/
下载Kafka的二进制包
解压到对应目录完成安装

Kafka下载
  http://kafka.apache.org/downloads

修改Kafka配置server.properties
 =0
listeners=PLAINTEXT://xxx:9092
log.retention.hours=1
zookeeper.connect=192.168.238.90:2181,192.168.238.92:2181,192.168.238.94:2181

Jvm内存修改/usr/local/kafka_2.12-2.5.0/bin/kafka-server-start.sh
KAFKA_HEAP_OPTS

Kafka使用systemctl管理/usr/lib/systemd/system/kafka.service
[Unit]
Description=kafka
After=network.target
[Service]
Type=simple
ExecStart=/usr/local/kafka_2.12-2.5.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-2.5.0/config/server.properties
User=root
[Install]
WantedBy=multi-user.target

创建topic
/usr/local/kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper 192.168.238.90:2181 --replication-factor 2 --partitions 1 --topic sjg
/usr/local/kafka_2.12-2.5.0/bin/kafka-topics.sh --describe --zookeeper 192.168.238.90:2181 --topic sjg

        Filebeat和Logstash间引入Kafka集群
架构演进
filebeat -> logstsash -> es
filebeat -> kafka(集群) -> logstash(多台) -> es

Logstash读取Kafka
input {
  kafka {
    bootstrap_servers => "192.168.238.90:9092,192.168.238.92:9092"
    topics => ["sjg"]
    group_id => "sjggroup"
    codec => "json"
  }
}

Filebeat日志发送到Kafka
filebeat.inputs:
- type: log
  tail_files: true
  backoff: "1s"
  paths:
      - /var/log/nginx/access.json.log
processors:
- drop_fields:
    fields: ["agent","ecs","log","input"]
output:
  kafka:
    hosts: ["192.168.238.90:9092", "192.168.238.92:9092"]
    topic: sjg

Kafka查看队列信息
查看Group: ./kafka-consumer-groups.sh --bootstrap-server 172.17.166.217:9092 --list
查看队列:./kafka-consumer-groups.sh --bootstrap-server 172.17.166.217:9092 --group test2 --describe
kafka集群搭建测试

Filebeat和Logstash间引入Kafka集群多日志分析

ELK集群之kafka(7)
Filebeat配置
filebeat.inputs:
- type: log
  tail_files: true
  backoff: "1s"
  paths:
      - /var/log/nginx/access.log
  fields:
    type: access
  fields_under_root: true

- type: log
  tail_files: true
  backoff: "1s"
  paths:
      - /var/log/secure
  fields:
    type: system
  fields_under_root: true
processors:
- drop_fields:
    fields: ["agent","ecs","log","input"]
output:
  kafka:
    hosts: ["192.168.238.90:9092", "192.168.238.92:9092"]
    topic: sjg

Logstash配置
input {
  kafka {
    bootstrap_servers => "192.168.238.90:9092,192.168.238.92:9092"
    topics => ["sjg"]
    group_id => "sjggroup"
    codec => "json"
  }
}
filter {
  if [type] == "access" {
    grok {
      match => {
        "message" => %{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}
      }
      remove_field => ["message"]
    }
    date {
      match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
  }
  else if [type] == "system" {
  }
}
output {
  if [type] == "access" {
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgaccess-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "system" {
    elasticsearch {
      hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
      user => "elastic"
      password => "sjgpwd"
      index => "sjgsystem-%{+YYYY.MM.dd}"
    }
  }
}
filebeat+kafka+logstash
ELK集群之kafka(7)
建两个索引来说明用户管理
  可设置某个索引只读

观察elastic角色
  super角色

创建角色
  给予某个索引的读权限read-sjgaccess,给某个索引只读权限read
  创建用户sjg, 给kibana_user、read-sjgaccess 角色

删除索引测试
  删除索引
  给all权限就能删除索引
kibana用户管理

 

ELK集群之kafka(7)

上一篇:42 递归


下一篇:[Flutter-30] 子控件之间的相对布局01