ELK日志收集之logstash部署 采集nginx/tomcat方案Filebeat + Logstash + ES + Kibana

一、logstash简介

Logstash是一个日志收集和转发的工具,可以将不同源的日志统一收集、过滤、转发。Logstash官方指导文档,官方软件包 下载地址

Filebeat与Logstash比较:


Filebeat

Logstash

资源消耗

轻量级

功能强大

处理能力

基本的解析和过滤

更复杂的数据转换和处理

适用场景

简单的日志收集和传输任务

更复杂的数据处理和转换任务

常见的日志采集处理解决方案

Filebeat + ES + Kibana
Filebeat + Logstash + ES + Kibana
Filebeat + Kafka/Redis/File/Console + 应用程序(处理/存储/展示)
Filebeat + Logstash+Kafka/Redis/File/Console + 应用程序(处理/存储/展示)

ELK日志收集之logstash部署 采集nginx/tomcat方案Filebeat + Logstash + ES + Kibana_logstash

二.Logstash部署

采集各种样式、大小和来源的数据,输出到多种存储

ELK日志收集之logstash部署 采集nginx/tomcat方案Filebeat + Logstash + ES + Kibana_tomcat_02

实时解析和转换数据   除了input和output,比filebeat多个filter模块

Logstash 能够动态地采集、转换和解析数据,不受格式或复杂度的影响。
利用 Grok 从非结构化数据中派生出结构
从 IP 地址破译出地理坐标
将 PII 数据匿名化,完全排除敏感字段简化整体处理,
不受数据源、格式或架构的影响

ELK日志收集之logstash部署 采集nginx/tomcat方案Filebeat + Logstash + ES + Kibana_nginx_03

1.RPM包方式部署Logstash

#下载rpm软件包
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.5-x86_64.rpm
#安装logstash
rpm -ivh logstash-7.17.5-x86_64.rpm
#创建软连接 全局生效
ln -svf /usr/share/logstash/bin/logstash /usr/local/sbin
#版本验证
logstash -V
#帮助信息
logstash -h

安装完成提示 OpenJDK 64-Bit Server VM warning警告可忽略

Using bundled JDK: /usr/share/logstash/jdk
Using provided startup.options file: /etc/logstash/startup.options
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/pleaserun-0.0.32/lib/pleaserun/platform/base.rb:112: warning: constant ::Fixnum is deprecated
Successfully created system startup script for Logstash

测试   基于命令行方式启动logstash实例

#测试 以标准输入和rubydebug格式输出 基于命令行方式启动logstash实例
logstash -e "input { stdin { type => stdin } } output { stdout { codec => rubydebug } }"
#提示 输入测试数据 例如输入alibaby回车
The stdin plugin is now waiting for input:
alibaby
{
       "message" => "alibaby",
    "@timestamp" => 2024-11-09T08:02:32.966Z,
          "host" => "elk01",
      "@version" => "1",
          "type" => "stdin"
}
#测试 以标准输入和json格式输出 基于命令行方式启动logstash实例
logstash -e "input { stdin { type => stdin } } output { stdout { codec => json } }"
#提示 输入测试数据 例如输入alibaby回车
alibaby
{"message":"alibaby","@timestamp":"2024-11-09T08:08:25.077Z","host":"elk01","type":"stdin","@version":"1"}

pipeline是用于处理数据流的一个核心组件,它定义了数据从输入到输出的整个处理过程,默认"pipeline.id"=>"main"

[INFO ] 2024-11-09 16:07:47.808 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}

2.二进制包部署Logstash

#下载二进制软件包
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.5-linux-x86_64.tar.gz
#解压
tar xf logstash-7.17.5-linux-x86_64.tar.gz -C /es/softwares/
#创建软连接 全局生效
ln -svf /es/softwares/logstash-7.17.5/bin/logstash /usr/local/sbin
#版本验证
logstash -V
#帮助信息
logstash -h

测试 基于命令行方式启动logstash实例

#测试 以标准输入和标准输出 基于命令行方式启动logstash实例 默认rubydebug格式输出
logstash -e "input { stdin { type => stdin } } output { stdout {} }"
alibaby
{
       "message" => "alibaby",
    "@timestamp" => 2024-11-09T08:24:30.978Z,
      "@version" => "1",
          "type" => "stdin",
          "host" => "elk02"
}

三.pipeline基本概念和功能


Logstash的pipeline是数据处理的流水线,它包含三个主要阶段:输入(Input)、过滤(Filter)和输出(Output)。每个阶段都有其特定的功能和配置方式:

输入阶段:负责从各种数据源读取数据。例如,可以使用file插件从文件中读取日志,或者使用http插件从HTTP接口获取数据。
过滤阶段:对输入的数据进行解析和转换。例如,可以使用grok插件解析日志中的字段,或者使用mutate插件进行字段的添加、删除或修改。
输出阶段:将处理后的数据发送到目标存储系统。例如,可以将数据发送到`Elasticsearch、Redis或其他数据库中。

在Logstash的配置文件中,可以使用pipeline关键字来定义一个管道,并为其命名。例如:

pipeline {
  id => "my_pipeline"
}

在管道中,可以配置输入、过滤器和输出:

配置输入:使用input关键字指定输入插件和参数。例如,从文件读取数据:

input {
  file {
    path => "/path/to/file.log"
    codec => "json"
  }
}

配置过滤:使用filter关键字指定过滤器插件和参数。例如,解析日志字段:

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
}

配置输出:使用output关键字指定输出插件和参数。例如,将数据发送到Elasticsearch:

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logs"
  }
}

四.Logstash的配置文件conf举例

0.标准输入 标准输出

#配置文件编写
cat >01-stdin-to-stdout.conf<<'EOF'
input { 
  stdin { type => stdin } 
} 

output { 
  stdout {} 
}
EOF
#启动
logstash -f 01-stdin-to-stdout.conf

2.beats类型输入 输出到ES

#配置文件编写
cat >02-beats-to-es.conf<<'EOF'
input { 
  #输入类型beats
  beats {
    #监听端口
    port => 5044
  }
} 

output { 
  #将数据写入ES集群
  elasticsearch {
    #指定ES主机地址
    hosts => ["http://192.168.77.176:9200"]
    #指定索引名称
    index => "alibaby-logstash-%{+YYYY.MM.dd}"
  }
}
EOF
#启动
logstash -f 02-beats-to-es.conf
#启动完成后可以看到监听的端口
[INFO ] 2024-11-09 18:44:54.369 [[main]<beats] Server - Starting server on port: 5044
#查看监听的端口
netstat -tlunp | grep 5044

ELK日志收集之logstash部署 采集nginx/tomcat方案Filebeat + Logstash + ES + Kibana_filebeat_04

3.filebeat采集nginx和tomcat日志,输出到logstash

#filebeat配置文件
cat >01-nginx-to-logstash.yaml<<'EOF'
filebeat.inputs:
#nginx日志文件采集
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log*
    - /var/log/nginx/error.log*
#多行选项配置
  multiline.type: pattern
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
  multiline.negate: true
  multiline.match: after
#tomcat日志文件采集
- type: log
  enabled: true
  paths:
    - /app/tools/tomcat/logs/localhost_access_log.*.txt
    - /app/tools/tomcat/logs/catalina*
#多行选项配置
  multiline.type: pattern
  multiline.pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'
  multiline.negate: true
  multiline.match: after
  
# 将数据输出到logstash中
output.logstash:
  # 指定logstash的主机和端口
  hosts: ["192.168.77.176:5044"]
EOF
#启动Filebeat时,确保指定正确的配置文件
filebeat -e -c 01-nginx-to-logstash.yaml

4.elasticsearch查看数据

ELK日志收集之logstash部署 采集nginx/tomcat方案Filebeat + Logstash + ES + Kibana_filebeat_05

5.kabana创建索引模式

ELK日志收集之logstash部署 采集nginx/tomcat方案Filebeat + Logstash + ES + Kibana_nginx_06

discover发现数据

ELK日志收集之logstash部署 采集nginx/tomcat方案Filebeat + Logstash + ES + Kibana_logstash_07


上一篇:汇编学习笔记基础指令注解


下一篇:QT版发送邮件程序