# fluent bit 是开源的日志处理器和转发器,是 Fluentd 和 CNCF 的子项目,其完全基于 Fluentd 架构和通用设计的最佳理念进行设计和构建 ...
# 其设计目标是成为瑞士军刀式的通用采集、处理、输出工具...
# 允许从不同来源收集任何数据,如指标和日志,用过滤器丰富它们并发送到多个目的地,是Kubernetes等容器化环境的首选
# 其设计上注重性能:高吞吐量、低CPU和内存使用率,用C语言编写,具有可插拔架构,支持70多种输入、过滤器和输出扩展 ...
# 其数据缓存在内存 (堆) 和文件系统 (可选的次要缓冲机制),采用异步方式进行输入/输出,并且是可插拔架构 (拥有超80个内置插件),支持promethues指标监控
# 支持使用简单的SQL查询进行数据的选择和转换,并使用查询结果创建新的数据流、聚合窗口、数据分析/预测、时间序列预测等 ...
# 其突出特点为: 轻量级、高性能、可扩展、可收集系统信息、事件驱动 (使用异步操作来收发数据)
# 支持 Linux、MacOS、Windows、BSD
# ---------------------------------------------- Theory
event-or-record
# 由 Fluent Bit 检索的日志或指标的每条传入的数据都被视为事件或记录
# 兵器在其内部,一个 Event 事件总是有两个组件(以数组形式): [TIMESTAMP, MESSAGE]
filtering
# 在需要对事件内容进行修改、丰富、删除的过程称为过滤,有很多用例:
# 1.将特定信息附加到事件中,如:IP地址或元数据
# 2.选择事件内容的特定部分
# 3.丢弃匹配特定默认的事件
tag
# 每个事件都会被分配一个标签,它是个内部的字符串,由路由器在后面的处理阶段中使用它来决定必须经过哪个路由器或输出阶段
# 大部分标签都是在配置中手动分配的,若未指定,则将分配生成该事件的输入插件实例的名称
timestamp
# 时间戳表示创建事件的时间,每个事件都包含一个关联的时间戳: SECONDS.NANOSECONDS
# 时间戳始终存在,它由input插件设置或通过数据解析过程发现
match
# Fluent Bit 允许将收集和处理的事件传送到一或多个目的地,这是通过路由阶段完成的
# 匹配 (match) 代表着一个简单的规则,用于选择它标记匹配定义规则的事件
structured-messages
# 源事件可以有也可以没有结构,一个结构在事件消息中定义了一组键和值 (结构有助于实现更快的数据修改操作)...
# 未结构化消息: "Project Fluent Bit created on 1398289291"
# 结构化的消息: {"project": "Fluent Bit", "created": 1398289291}
数据管道 data pipeline
Input
# fluent bit 提供若干输入插件来收集来自不同数据源的信息,例如日志文件、操作系统的度量信息等
# 当其加载插件时将会创建一个内部实例,每个实例都有其自己的独立配置键,它们统称为属性
# 并且每个插件都有其自身所属文档: https://docs.fluentbit.io/manual/pipeline/inputs
parser
# 解析器用于将非结构化消息转为结构化消息 (具有结构是非常重要的) ,理想情况下通常希望在输入插件收集后立即为传入数据设置一个结构
# 解析器是可配置的,并且由每个输入插件独立、可选的处理: https://docs.fluentbit.io/manual/pipeline/parsers
# Example:
# 192.168.2.20 - - [28/Jul/2006:10:27:10 -0300] "GET /cgi-bin/try/ HTTP/1.0" 200 3395
# Out:
# {
# "host": "192.168.2.20",
# "user": "-",
# "method": "GET",
# "path": "/cgi-bin/try/",
# "code": "200",
# "size": "3395",
# "referer": "",
# "agent": ""
# }
filter
# 过滤是非常重要的功能,它允许在将数据传送到某个目的地之前对其进行更改,它是通过插件实现的
# 过滤器可用于匹配、排除或丰富数据,其中包含一些特定的元数据 (其常见用例是 kubernetes 部署,每个POD日志都需要获取关联的正确元数据)
# 与输入插件类似,过滤器在一个实例上下文中运行,并拥有自己独立的配置,其独立配置键统称为属性
# https://docs.fluentbit.io/manual/pipeline/filters
buffer
# 其提供了基于内存的模式、基于文件系统的模式,并且它们可以是不冲突的 ...
# 缓冲数据不是原始的文本,而是 Fluent bit 的内部二进制表示
# 其在文件系统中提供了缓冲机制作为备份,避免在系统故障时丢失数据 ...
router
# 路由是一项核心的功能,它允许通过过滤器将数据路由到一或多个目的地,路由器依赖于标签和匹配规则的概念
# 它有两个重要的概念:1、标签 2、匹配
# 当数据由输入插件生成时,它带有一个Tag (大多数时候Tag是手动配置的),它是一个人类可读的标记,用于识别数据
# 为了定义应将数据路由到何处,所以后面必须在输出配置中指定匹配规则
# 下例将CPU指标传送到Elasticsearch,并将内存指标传送到标准输出:
#
# [INPUT]
# Name cpu
# Tag my_cpu
# [INPUT]
# Name mem
# Tag my_mem
# [OUTPUT]
# Name es
# Match my_cpu
# [OUTPUT]
# Name stdout
# Match my_mem
# 路由具有足够的灵活性,以支持通配符 (下例为两个数据源定义了一个公共的目标)
# [INPUT]
# Name cpu
# Tag my_cpu
# [INPUT]
# Name mem
# Tag my_mem
# [OUTPUT]
# Name stdout
# Match my_*
Output
# 其用于定义数据的目的地,常用的如远程服务、本地文件系统、或其他的标准接口,它也是作为插件实现的
# 加载输出插件时会创建一个内部实例,每个实例都有其独立的配置 (属性)
# 每个输出插件都有自己的文档部分: https://docs.fluentbit.io/manual/pipeline/outputs
安装部署
# Kubernetes 部署 (Helm)
# https://docs.fluentbit.io/manual/installation/kubernetes#installation
# Docker 部署
# https://docs.fluentbit.io/manual/installation/docker
# Windows 部署
# https://docs.fluentbit.io/manual/installation/windows
# ------------------------------------------------------------------- Linux
cat > /etc/yum.repos.d/fluentbit.repo <<'EOF'
[td-agent-bit]
name = TD Agent Bit
baseurl = https://packages.fluentbit.io/centos/7/$basearch/
gpgcheck=1
gpgkey=https://packages.fluentbit.io/fluentbit.key
enabled=1
EOF
yum install td-agent-bit
systemctl enable td-agent-bit --now
# 其默认收集CPU使用率并将其发送到标准输出,可在 /var/log/messages 中查看
configuring
# ref: https://docs.fluentbit.io/manual/administration/configuring-fluent-bit/configuration-file
# 主配置文件路径: /tmp/main.conf
# 当其使用配置文件来定义行为时的格式如下(采用严格的缩进模式,包括注释的位置):
@SET my_input=cpu # 支持在全局级别设置变量并在后续配置中引用
@SET my_output=stdout
[SERVICE]
# This is a commented line
Flush 5
Daemon off
log_level debug
[INPUT]
# This is a commented line
Name cpu
Tag my_cpu
# more comments
[FILTER]
Name stdout
Match *
[OUTPUT]
Name ${MY_OUTPUT} # 除 @SET 指令外还支持引用环境变量: export MY_OUTPUT=stdout
Match my*cpu
@INCLUDE inputs.conf # 支持引用外部的配置文件: @INCLUDE File
@INCLUDE outputs.conf
# sections内的字段属性不能为空,主配置文件支持四种类型的sections:
# 1.Service 服务部分定义了全局属性,如日志级别、HTTP端口等...
# 2.Input 用于限定输入源,其中 Name 用于设置插件名称、Tag 用于设置标签
# 3.Filter 用于限定过滤器,其中 Name 用于设置插件名称、以及 Match (区分大小写并支持*号)、Match_Regex (支持正则)
# 4.Output 用于指定在标签匹配后要发往的目的地,其中 Name 用于设置插件名称、以及 Match、、Match_Regex
# ------------------------------------------------------ Example
[SERVICE]
Flush 5
Daemon off
Log_Level debug
[INPUT]
Name cpu
Tag my_cpu
[OUTPUT]
Name stdout
Match my*cpu
# ------------------------------------------------------ 上游服务器
# 其用于在输出时跨不同节点以实现负载均衡,默认的模式为轮询
# 要定义上游,需创建一个包含上游和一或多个节点的特定配置文件 (每个 upstream 的定义都必须存在于自己的配置文件中,不允许在一个文件中定义多个上游) ...
# 下例定义了名为 forward-balancing 的 upstream 旨在供 forward 输出插件使用,它注册了三个Nodes:
[UPSTREAM]
name forward-balancing
[NODE]
name node-1
host 127.0.0.1
port 43000
[NODE]
name node-2
host 127.0.0.1
port 44000
[NODE]
name node-3
host 127.0.0.1
port 45000
tls on
tls.verify off
shared_key secret
# ------------------------------------------------------ 结构化数据的访问、提取
# fluent bit在内部使用结构化的数据格式,它可以由无限数量的键值组成,值可以是数字、字符串、数组或映射之类的任何值
# 对结构化的数据内容的访问功能对某些核心功能或插件很重要,这种功能被称为: record accessor
# 下例展示如何访问结构化数据:
# {
# "log": "some message", # 通过 $log 可提取 "some message"
# "stream": "stdout", #
# "labels": { # 通过 $labels['color'] 可提取 "blue"
# "color": "blue", #
# "unset": null, # 通过 $labels['unset'] 可提取 null
# "project": { #
# "env": "production" # 通过 $labels['project']['env'] 可提取 "production"
# } # 通过 $labels['undefined'] 将返回空
# }
# }
#
# --------------------------- Usage Example
[SERVICE]
flush 1
log_level info
parsers_file parsers.conf
[INPUT]
name tail
path test.log
parser json
[FILTER]
name grep
match *
regex $labels['color'] ^blue$ # 在filter段下的grep插件对记录的过滤阶段,该 record accessor 仅匹配标签具有蓝色的位置
[OUTPUT]
name stdout
match *
format json_lines
# The file content to process in test.log is the following:
$ bin/fluent-bit -c fluent-bit.conf
Fluent Bit v1.x.x
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
[2020/09/11 16:11:07] [ info] [engine] started (pid=1094177)
[2020/09/11 16:11:07] [ info] [storage] version=1.0.5, initializing...
[2020/09/11 16:11:07] [ info] [storage] in-memory
[2020/09/11 16:11:07] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2020/09/11 16:11:07] [ info] [sp] stream processor started
[2020/09/11 16:11:07] [ info] inotify_fs_add(): inode=55716713 watch_fd=1 name=test.log
{"date":1599862267.483684,"log":"message 1","labels":{"color":"blue"}}
{"date":1599862267.483692,"log":"message 4","labels":{"color":"blue"}}
buffering-and-storage
# ref: https://docs.fluentbit.io/manual/administration/buffering-and-storage
# Fluent Bit的最终目标是收集、解析、过滤日志并将其发送到一个中心位置
# 在此工作流程中有许多阶段,其中一个关键部分是进行缓冲的能力:一种将处理过的数据放置到临时位置直到准备好发送的机制 ...
# 默认情况下当 Fluent Bit 处理数据时使用内存作为存储记录的主要和临时位置,但在某些情况下,理想情况是在文件系统中具有持久缓冲机制以提供聚合和数据安全的功能 ...
# 当输入插件(源)发出记录时引擎将记录组合在一个Chunk中。块大小通常为2MB。通过配置可以让引擎决定把这个Chunk放在哪里,默认是所有的chunk都只在内存中创建 (引擎生成的块被放置在内存中,但这是可配置的)
# 但如果由于网络缓慢或远程服务无响应而无法足够快地传送记录,则 Fluent Bit 内存使用量将增加,因为它会积累更多的数据
# 高内存使用率的风险是有机会被内核 OOM Killer 杀死
# 解决方法是限制输入插件可使用的内存量,该配置属性被称为 mem_buf_limit
# 若插件排队超过 mem_buf_limit 则它将无法摄取更多 (插件被暂停),直到它的数据可以正确交付或冲洗
# 这有助于控制服务的内存使用,但代价是如果文件在插件被暂停时轮转,则可能会丢失该数据 ...
# 为了保证数据的完整性,可使用文件系统缓冲 (内存和文件系统缓冲机制并不是相排斥的) 当为输入插件启用文件系统缓冲时,将获得两方面的优势:性能、数据安全
# 当启用文件系统缓冲后,引擎在创建块时会将内容存在内存中,但也会在磁盘上映射副本 (通过mmap)
# 如果输入插件已启用 mem_buf_limit 并且 storage.type 为 filesystem 时:
# 当达到 mem_buf_limit 阈值时输入插件不会被暂停,所有新数据都将转到文件系统中,这允许控制内存的使用,也保证了服务不会丢失数据
# ------------------------------------------------------ Example
# 下例定义了可选的缓冲机制,其数据根目录是 /var/log/flb-storage/,它将使用正常同步模式且没有校验和,处理积压数据时使用最多 16MB 的内存 ...
[SERVICE]
flush 1
log_Level info
storage.path /var/log/flb-storage/ # 在文件系统中设置一个可选位置来存储数据流和数据块,若未设置则输入插件只能使用内存缓冲
storage.sync normal # 用于将数据存储到文件系统的同步模式,值可以是: normal、full
storage.checksum off # 从文件系统写入和读取数据时启用数据完整性检查 (存储层使用 CRC32 算法)
storage.backlog.mem_limit 16M # 若设置了storage.path 则将查找未交付且仍在存储层中的数据块 (积压数据) 此选项配置处理这些记录时要使用的内存最大值
storage.metrics off # 若在主配置的[SERVICE]内启用了http_server选项,则此选项会注册一个新端点,可在其中使用存储层的内部指标
[INPUT] # 任何输入插件的属性设置中都可以配置它们的存储首选项
name cpu #
storage.type filesystem # 指定要使用的缓冲机制,可以是: memory 或 filesystem
[INPUT]
name mem
storage.type memory # 默认值为 memory
[OUTPUT]
name stackdriver
match *
storage.total_limit_size 5M # 若由于网络问题导致脱机,它将继续缓冲CPU样本,但仅保留最多5M的最新数据 ...
backpressure (反压/背压)
# 在某些环境中通常会看到被摄取的日志或数据比将其发送到某些目的地的速度要快
# 常见情况是从大日志文件中读取并将日志分发到网络上的后端,这需要花费一些时间来进行响应,此时会产生背压,从而导致大量的内存消耗
# 为了避免背压,Fluent Bit 在引擎中实现了一种机制来限制输入插件可以摄取的数据量,这是通过配置参数Mem_Buf_Limit 完成的
# 如果插件由于配置而受到限制并且处于背压情况,则在内存中的数据块可以刷新之前将输入插件无法摄取更多数据
# 根据使用的输入插件类型,可能会导致丢弃传入数据(如TCP输入插件),不过还是能够依靠辅助文件系统缓冲来确保安全 ...
# ------------------------------------------------------ Example
# Mem_Buf_Limit 选项默认禁用 (它可应用于所有输入插件) 下面使用以下场景来解释它的行为:
# 1. Mem_Buf_Limit 设置为 1MB(一兆字节)
# 2. 输入插件尝试追加 700KB
# 3. 引擎将数据路由到输出插件
# 4. 输出插件后端(HTTP 服务器)已关闭
# 5. 引擎调度程序将在 10 秒后再次尝试刷新
# 6. 输入插件尝试追加 500KB
# 此时引擎还将允许将这 500KB 的数据附加到引擎中 (目前共有1.2MB)。这些选项在达到限制前以许可模式工作,但超出限制时采取以下措施:
# 1. 阻止输入插件的本地缓冲区(无法附加更多数据)
# 2. 通知输入插件调用暂停回调
# 引擎会自我保护,因此不会附加更多来自相关输入插件的数据,不过插件可自定义保持它们的状态并决定在暂停状态下做什么 ...
# 几秒钟后,如果调度程序能够刷新最初的 700KB 数据或在重试后放弃,则释放该数量的内存并在内部发生以下操作:
# 1. 数据缓冲区释放 (700KB) 后,内部计数器更新
# 2. 计数器现在设置为 500KB
# 3. 由于 500KB < 1MB ,此时会检查输入插件状态
# 4. 如果插件被暂停了,它将调用回调
# 5. 输入插件此时可以再继续添加更多的数据
# 注: 每个插件都是独立的,并不是所有插件都实现了暂停和恢复回调。如前所述,这些回调只是插件的通知机制 ...
# 实现并保持良好状态的插件是 Tail Input 插件。当暂停回调被触发时它会停止收集器并停止追加数据
scheduling and retries
# Fluent Bit有一个引擎,可以帮助协调输入插件的数据摄取,并调用调度程序来决定何时通过一个或多个输出插件刷新数据
# 调度程序以固定几秒的间隔时间刷新新数据,并在询问时调度重试
# 一旦输出插件被调用来刷新数据,在处理数据后,它可以告知引擎三种可能的返回状态:
# 1.OK # 意味着能够成功处理和刷新数据
# 2.Retry # 意味着发生了不可恢复的错误,并且引擎不会再次尝试刷新该数据 ...
# 3.Error # 引擎将要求调度程序重试刷新该数据,调度程序决定多少秒后执行
# ------------------------------------------------------ Example
# 调度程序提供了简单的配置选项 "Retry_Limit"
# 可在每个输出部分独立设置,此选项允许禁用重试或施加限制以尝试N次,然后在达到该限制后丢弃数据 ...
# 下例配置两个输出,其中HTTP插件具有无限重试次数,而 Elasticsearch 插件具有5次的限制
[OUTPUT]
Name http
Host 192.168.5.6
Port 8080
Retry_Limit False # 不进行限制 ...
[OUTPUT]
Name es
Host 192.168.5.20
Port 9200
Logstash_Format On
Retry_Limit 5 #
networking
# Fluent Bit 实现了统一的网络接口,该接口暴露给插件等组件使用,它抽象了通用I/O的所有复杂性,并且是完全可配置的 ...
# 常见用例是组件或插件需连接到服务以发送和接收数据,期间会有许多因素会使事情变得困难,例如服务无响应、网络延迟、任何类型的连接错误等
# 该统一的网络接口旨在抽象和简化网络I/O处理、最小化风险和优化性能 ...
# ------------------------------------------------------ Example
# 下例通过TCP输出连接发送5条随机消息
[SERVICE]
flush 1
log_level info
[INPUT]
name random
samples 5
[OUTPUT]
name tcp
match *
host 127.0.0.1
port 9090
format json_lines
# Networking Setup
net.connect_timeout 5 # 设置等待TCP连接建立的最长时间(秒),这包括TLS握手时间
net.source_address 127.0.0.1 # 指定用于连接和数据流量的网络地址
net.keepalive on # 启用或禁用连接保活支持,默认开启
net.keepalive_idle_timeout 10 # 空闲保活连接的最长时间(秒),默认30
# 在另一个终端启动nc并使其侦听TCP/9090的消息:
# $ nc -l 9090
# {"date":1587769732.572266,"rand_value":9704012962543047466}
# {"date":1587769733.572354,"rand_value":7609018546050096989}
# {"date":1587769734.572388,"rand_value":17035865539257638950}
# {"date":1587769735.572419,"rand_value":17086151440182975160}
# {"date":1587769736.572277,"rand_value":527581343064950185}
# 若配置中未启用 net.keepalive 选项,Fluent Bit 将关闭TCP连接,netcat也将退出 ...
# 上例中当5条记录到达后连接将保持空闲状态,10秒后将由于 net.keepalive_idle_timeout 的时间限制而关闭
validating data and structure
# 在正常生产环境中的配置中定义了许多输入、过滤器、输出,因此必须根据预期结果对配置进行持续验证
# 因此 Fluent Bit 提供了名为 Expect 的特定过滤器,可用于验证记录中预期的键和值,并在发现异常时采取措施
# 官方鼓励用户将数据验证集成到 CI 系统中 ...
# 下面示例中的数据源是带有JSON内容的普通文件,然后是两个过滤器:grep用于排除某些记录、record_modifier用于增删特定键的内容
# 理想情况下用户希望在每个步骤之间添加数据验证检查点,以便知道数据结构是否正确,这可以通过使用 expect 过滤器来实现 ...
# Expect 过滤器设置旨在验证某些标准的规则,例如:
# 记录内是否含键A ?
# 记录内不包含键A ?
# 记录内的键 A 的值是否等于 NULL ?
# 记录内的键 A 的值是否与 NULL 不同 ?
# 记录内的键 A 的值是否等于 B ?
# 每个 Expect (期望) 过滤器配置都可以公开特定规则来验证记录内容,它支持以下配置属性:
key_exists # 检查记录中是否存在具有给定名称的键
key_not_exists # 检查记录中是否不存在键
key_val_is_null # 检查键的值是否为 NULL
key_val_is_not_null # 检查键的值是否为非空
key_val_eq # 检查键的值是否等于配置中的给定值
action # 当规则不匹配时采取的行动。可用的选项是: warn、exit
# warn: 当发现上述规则不匹配时向日志记录层发送警告消息;
# exit: 使 Fluent Bit 中止并返回 255 状态码
# ------------------------------------------------------ Example
# 示例文件 ...
cat > data.log <<'EOF'
{ "color" : "blue" , "label" : { "name" : null } }
{ "color" : "red" , "label" : { "name" : "abc" } , "meta" : "data " }
{ "color" : "green" , "label" : { "name" : "abc" } , "meta" :空}
EOF
# 以下配置文件将验证名为color和label的key是否存在 ...
[SERVICE]
flush 1
log_level info
parsers_file parsers.conf
[INPUT]
name tail
path ./data.log
parser json
exit_on_eof on
[FILTER]
name expect # First 'expect' filter to validate that our data was structured properly
match *
key_exists color # validating
key_exists $label['name'] # ...
action exit
[OUTPUT]
name stdout
match *
# ------------------------------------------------------
# 扩展管道,添加一个grep过滤器来匹配名为name的键的值为abc,然后由 expect 过滤器来重新验证:
[SERVICE]
flush 1
log_level info
parsers_file parsers.conf
[INPUT]
name tail
path ./data.log
parser json
exit_on_eof on
[FILTER]
name expect # First 'expect' filter to validate that our data was structured properly
match *
key_exists color
key_exists label
action exit
[FILTER] # Match records that only contains map 'label' with key 'name' = 'abc'
name grep
match *
regex $label['name'] ^abc$
[FILTER] # Check that every record contains 'label' with a non-null value
name expect
match *
key_val_eq $label['name'] abc
action exit
[FILTER] # Append a new key to the record using an environment variable
name record_modifier
match *
record hostname ${HOSTNAME}
[FILTER] # Check that every record contains 'hostname' key
name expect
match *
key_exists hostname
action exit
[OUTPUT]
name stdout
match *