先带大家看一张架构图,建议企业日志采集架构采用:Beats + Kafka + Logstash + ES集群 + Kibana(Grafana)
Beats: 由于原生的Logstash当日志采集器过重,所以采用Beats来进行采集,常用的采集器有filebeat和metricbeat。采集数据写入Kafka
Kafka: 由于Logstash当日志接收的话,有单点瓶颈,所以加个Kafka集群便于架构的扩建
Logstash:Logstash消费Kafka数据写入ES集群,Logstash消费Kafka可多台扩展
Elasticsearch:搜索数据库,数据存储的地方,注意集群,注意备份!
Kibana或Grafana:Kibana展示日志比较方便,Grafana展示性能或者分析数据比较清晰,两者比可采用
部署过程中重要笔记
ES集群配置/etc/elasticsearch/elasticsearch.yml cluster.name: sjg node.name: node1 node.master: true node.data: true path.data: /var/lib/elasticsearch path.logs: /var/log/elasticsearch network.host: 0.0.0.0 http.port: 9200 discovery.seed_hosts: ["xxx", "xxx"] cluster.initial_master_nodes: ["xxx", "xxx"] xpack.security.enabled: true xpack.monitoring.enabled: true xpack.security.transport.ssl.enabled: true xpack.security.transport.ssl.verification_mode: certificate xpack.security.transport.ssl.keystore.path: /etc/elasticsearch/elastic-certificates.p12 xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/elastic-certificates.p12 Kibana配置连接ES集群kibana.yml server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: ["http://xxx:9200", "http://xxx:9200"] elasticsearch.username: "elastic" elasticsearch.password: "xxx" logging.dest: /tmp/kibana.log Logstash配置Logstash.conf input { kafka { bootstrap_servers => "xxx:9092,xxx:9092" topics => ["sjgkafka"] group_id => "sjggroup" codec => "json" } } filter { grok { match => { "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}' } remove_field => ["message"] } date { match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } } output { elasticsearch { hosts => ["http://xxx:9200", "http://xxx:9200"] user => "elastic" password => "xxx" index => "sjgnginx-%{+YYYY.MM.dd}" } } Filebeat配置filebeat.yml filebeat.inputs: - type: log tail_files: true backoff: "1s" paths: - /var/log/nginx/access.log processors: - drop_fields: fields: ["agent","ecs","log","input"] output: kafka: hosts: ["xxx:9092", "xxx:9092"] topic: sjgkafka ZK集群配置zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/data clientPort=2181 autopurge.snapRetainCount=3 autopurge.purgeInterval=1 server.1=xxx:2888:3888 server.2=xxx:2888:3888 server.3=xxx:2888:3888 Kafka配置server.properties broker.id=0 listeners=PLAINTEXT://xxx:9092 log.retention.hours=1 #根据实际情况修改 zookeeper.connect=xxx:2181,xxx:2181,xxx:2181 ELK7日志采集架构实战链接:https://edu.51cto.com/course/25074.html