日志系统升级_ELK+KafKa

日志系统升级_ELK+KafKa的相关配置

安装zookeeper教程

(26条消息) windows环境下安装zookeeper教程详解(单机版)_风轩雨墨的博客-CSDN博客

windows环境下安装配置Kafka集群

(26条消息) windows环境下安装配置Kafka集群_qq1170993239的博客-CSDN博客

Logback+kafka+ELK搭建微服务日志

(27条消息) Logback+kafka+ELK搭建微服务日志_a294634473的博客-CSDN博客

(27条消息) springboot+logback+kafka+logstash将分布式日志汇集到ElasticSearch中_风轻衣的博客-CSDN博客

SpringBoot+kafka+ELK分布式日志收集 - 聂晨 - 博客园 (cnblogs.com)

logback连接kafka报错

(27条消息) kafka报错:Connection with localhost/127.0.0.1 disconnected java.net.ConnectException: Connection refus_lyxuefeng的博客-CSDN博客

系统中logback-spring.xml的配置,关于kafka

配置

<!-- 为Kafka输出的JSON格式的Appender -->
    <appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder>
            <pattern>
                <pattern>
                    {
                    "thread": "%thread",
                    "logLevel": "%level",
                    "message": "%message",
                    "class": "%logger{40}",
                    "serviceName": "${springAppName:-}",
                    "trace": "%X{X-B3-TraceId:-}",
                    "span": "%X{X-B3-SpanId:-}",
                    "exportable": "%X{X-Span-Export:-}",
                    "pid": "${PID:-}"

                    }
                </pattern>
            </pattern>
        </encoder>
        <topic>kafka-log</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
        <!-- Optional parameter to use a fixed partition -->
        <!-- <partition>0</partition> -->
        <!-- Optional parameter to include log timestamps into the kafka message -->
        <!-- <appendTimestamp>true</appendTimestamp> -->
        <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
        <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
        <!-- bootstrap.servers is the only mandatory producerConfig -->
        <producerConfig>bootstrap.servers=10.11.83.80:9093</producerConfig>

        <!-- 如果kafka不可用则输出到控制台 -->
        <appender-ref ref="STDOUT"/>

    </appender>

    <!--异步写入kafka,尽量不占用主程序的资源-->
    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
        <neverBlock>true</neverBlock>
        <includeCallerData>true</includeCallerData>
        <discardingThreshold>0</discardingThreshold>
        <queueSize>2048</queueSize>
        <appender-ref ref="kafka" />
    </appender>


    <!-- 日志输出级别 -->
    <root level="INFO">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="logstash" />
        <appender-ref ref="syslog"/>
        <appender-ref ref="ASYNC"/>
    </root>

说明

日志系统升级_ELK+KafKa

kafka集群中的配置

日志系统升级_ELK+KafKa

日志系统升级_ELK+KafKa

kafka1:

##############主要配置部分 start ##############
# broker 编号,集群内必须唯一
broker.id=1
# host 地址
host.name=127.0.0.1
# 端口
port=9092
# 允许外部端口连接                                            
listeners=PLAINTEXT://:9092  
# 外部代理地址                                                
advertised.listeners=PLAINTEXT://10.11.83.80:9092
# 消息日志存放地址
log.dirs=C:/install/Kafka/kafka_1/log
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
#zookeeper.connect=localhost:2181
##############主要配置部分 end ##############

kafka2:

##############主要配置部分 start ##############
# broker 编号,集群内必须唯一
broker.id=2
# host 地址
host.name=127.0.0.1
# 端口
port=9093
# 允许外部端口连接                                            
listeners=PLAINTEXT://:9093
# 外部代理地址                                                
advertised.listeners=PLAINTEXT://10.11.83.80:9093
# 消息日志存放地址
log.dirs=C:/install/Kafka/kafka_2/log
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
#zookeeper.connect=localhost:2181
##############主要配置部分 end ##############

kafka3:

##############主要配置部分 start ##############
# broker 编号,集群内必须唯一
broker.id=3
# host 地址
host.name=127.0.0.1
# 端口
port=9094
# 允许外部端口连接                                            
listeners=PLAINTEXT://:9094
# 外部代理地址                                                
advertised.listeners=PLAINTEXT://10.11.83.80:9094
# 消息日志存放地址
log.dirs=C:/install/Kafka/kafka_3/log
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
#zookeeper.connect=localhost:2181
##############主要配置部分 end ##############

logstash.conf的配置

配置

input { 
# stdin { }

 kafka {
        #Value type is string
        #There is no default value for this setting.
        #A topic regex pattern to subscribe to.The topics configuration will be ignored when using this configuration.
        topics => ["kafka-log"]
        #Value type isstring
        #Default value is "localhost:9092"
        #A list of URLs of Kafka instances to use for establishing the initial connection to the cluster. This list should be in the form of host1:port1,host2:port2 These urls are just used for the initial connection to discover the full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
        bootstrap_servers => "10.11.83.80:9093"
        #Value type is codec
        #Default value is "plain"
        #The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.
        codec => "json"
    }

}

filter {
	#过滤info 中所有以 "logInfo切面日志" 开头的日志
	if([logLevel]=~"INFO"){
		if ("logInfo切面日志" in [message]) {
			
		}else{
			 drop {}
		}
	}
	
	
  grok {
   
    match => [
	  "message","%{LOGLEVEL:logLevel}%{NOTSPACE:tag}[T ]%{NOTSPACE:method}[\n]%{NOTSPACE:api}[\n]%{NOTSPACE:params}",
      "message","%{NOTSPACE:tag}[T ]%{NOTSPACE:author}[T ]%{NOTSPACE:msg}"
      ]
	
  }
  
}

output {

     elasticsearch { 
	hosts => ["10.11.53.54:9200"]
	index => "logstash-test-%{+YYYY.MM.dd}"
	

	action => "index"
	template=>"C:/install/elk/logstash-7.10.2-windows-x86_64/logstash-7.10.2/config/logstash-test-.json"
	template_name=>"logstash-test-"
	manage_template => true
	template_overwrite => true

     }
stdout { codec => rubydebug }

} 

说明:

日志系统升级_ELK+KafKa

其他参考:

基于Kafka+ELK搭建海量日志平台 - 苍青浪 - 博客园 (cnblogs.com)

说明:

日志系统升级_ELK+KafKa

上一篇:2021年大数据ELK(十六):Elasticsearch SQL(职位查询案例)


下一篇:Linux操作篇之ELK