Airflow v2.0 分布式部署 elasticsearch日志解决方案
安装环境:
- docker
- airflow v2.0
- elasticsearch 7+
- filebeat 7+
开发依赖:
pip install 'apache-airflow-providers-elasticsearch'
日志方案
graph LR AF[(Airflow)] -.写入.-> LOG[[json格式日志文件]] -.读取.-> FB[/Filebeat 解析规范化日志结构/] -.存入.-> ES[(ElasticSearch)]上图filebeat和logstash之间可以加入logstash处理,根据个人方案设计。
airflow配置
开启远程日志设置,另配置elasticsearch设置信息,使webserver可以访问到elasticsearch,远程日志的获取是通过对log_id
进行搜索的,所以要保证日志输出包含与log_id_template
配置格式匹配的log_id
字段。
[logging]
# 开启远程日志开关
remote_logging = True
# 设置webserver elasticsearch连接信息
[elasticsearch]
host = your_host:your_port
# log_id模板,日志搜索的id
log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
# 是否将日志输出到标准输出,根据个人日志方案设置,由于本人是需要filebeat读取日志文件再发送至elasticsearch,所以设为false
write_stdout = False
# 是否将日志输出为json
json_format = True
# elasticsearch 加密配置,根据个人需求配置
[elasticsearch_configs]
use_ssl = False
verify_certs = False
filebeat配置
filebeat负责读取worker节点任务执行产生的日志,并将其格式化规范化后发给elasticsearch进行保存。
注意事项:
- host字段须为可哈希类型或者不存在
下面配置仅包含涉及到需要修改的配置。
# 读取日志文件
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
enabled: true
paths:
- /app/logs/**/*.log
exclude_files:
- '.py.log$'
setup.template:
# name和pattern为elasticsearch index设置需要
name: "airflow_log_template"
pattern: "airflow-log*"
# 关闭Index lifecycle management,否则修改index会无效
setup.ilm.enabled: false
output.elasticsearch:
# Array of hosts to connect to.
hosts: '${FILEBEAT_OUTPUT_ELASTICSEARCH_HOSTS:"127.0.0.1:9200"}'
# Protocol - either `http` (default) or `https`.
protocol: '${FILEBEAT_OUTPUT_ELASTICSEARCH_PROTOCOL:"http"}'
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
username: '${FILEBEAT_OUTPUT_ELASTICSEARCH_USERNAME:""}'
password: '${FILEBEAT_OUTPUT_ELASTICSEARCH_PASSWORD:""}'
index: "airflow-log-%{+yyyy.MM}"
# 日志处理设置
processors:
# 关闭host信息输出
# - add_host_metadata:
# when.not.contains.tags: forwarded
# 添加对json日志的解析
- decode_json_fields:
fields: ["message"]
process_array: false
max_depth: 1
target: ""
overwrite_keys: true
add_error_key: true
# 移除host字段
- drop_fields:
fields: ["host"]
ignore_missing: true
当配置修改成功后,使用
filebeat -e
进行配置测试
参考
airflow webserver获取日志报错
[2021-01-27 15:12:57,006] {app.py:1891} ERROR - Exception on /get_logs_with_metadata [GET]
Traceback (most recent call last):
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/auth.py", line 34, in decorated
return func(*args, **kwargs)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/decorators.py", line 60, in wrapper
return f(*args, **kwargs)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/views.py", line 1054, in get_logs_with_metadata
logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/log/log_reader.py", line 58, in read_log_chunks
logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 217, in read
log, metadata = self._read(task_instance, try_number_element, metadata)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 163, in _read
logs_by_host = self._group_logs_by_host(logs)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 132, in _group_logs_by_host
grouped_logs[key].append(log)
TypeError: unhashable type: 'AttrDict'
上面报错对应代码:
@staticmethod
def _group_logs_by_host(logs):
grouped_logs = defaultdict(list)
for log in logs:
key = getattr(log, 'host', 'default_host') # 此处为获取host值作为日志键
grouped_logs[key].append(log)
# return items sorted by timestamp.
result = sorted(grouped_logs.items(), key=lambda kv: getattr(kv[1][0], 'message', '_'))
return result
由上面代码可以看出,上面会获取日志host
字段信息并将其作为字典的键,所以日志中host字段内容必须为可以做字典键的可哈希类型
,不可为列表或者字典等可变类型,删除日志host字段或者设为哈希类型可以解决此问题。