SLS数据加工——动态解析与分发日志实战

背景

阿里云日志服务提供可托管、可扩展、高可用的数据加工服务。数据加工服务可用于数据的规整、富化、流转、脱敏和过滤。本文为读者带来了数据加工动态解析与分发的最佳实践。


场景

现有多个不同的APP,所有APP的程序运行日志都输入到同一个中心Logstore中。每个APP的日志都是以分隔符分隔的文本日志,但是日志字段schema各不相同。日志样例如下:

APP_1的日志样例
content: schema_app1|113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0
APP_2的日志样例
content: schema_app2|183.93.165.82|db-01|MySQL|5.5|0|cn-shanghai|1072|user-2
APP_3的日志样例
content: schema_app3|root|container4|image3|www.jd.mock.com|221.176.106.202|200|01/Apr/2021:06:27:56

上述APP的日志格式为:"schema_id|字段值1|字段值2|字段值3..."

  • schema_id为该日志的字段schema的ID
  • "字段值X"是日志的各个字段值,每个字段值的字段名由schema_id对应schema定义。


所有schema的定义存储在OSS的一个文件中,并与schema_id一一映射。Schema定义文件的内容格式如下:

{
  "schema_app1": {
    "fields": ["client_ip", "host", "http_method", "resquest_length", "status_code", "request_time", "user_agent"],
    "logstore": "logstore_app1"
  },
  "schema_app2": {
    "fields": ["client_ip", "db_name", "db_type", "db_version", "fail", "region", "check_rows", "user_name"],
    "logstore": "logstore_app2"
  },
  "schema_app3": {
    "fields": ["user", "container_name", "image_name", "referer", "container_ip", "status_code", "datetime"],
    "logstore": "logstore_app3"
  },
}

其中schema_app1等是schema_id。每个schema的定义包含两个字段,fields和logstore,fields定义了该schema对应的字段名列表,logstore定义了该schema的日志要分发的目标logstore名。


需求

  1. 对中心Logstore中不同Schema的日志进行动态解析(Schema在动态变化),将分隔符分隔的各个字段值映射到对应的字段名上,形成结构化的日志。
  2. 不同Schema的日志分发到不同的Logstore中。


例子:

  • 中心Logstore的原始日志
content: schema_app1|113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0
content: schema_app2|183.93.165.82|db-01|MySQL|5.5|0|cn-shanghai|1072|user-2


  • 加工后的日志
输出到logstore_app1:
{"client_ip": "113.17.4.39", "host": "www.zsc.mock.com", "http_method": "PUT", "resquest_length": 1082, "status_code": 404, "request_time": 28.3, "user_aent": "Mozilla/5.0"}
输出到logstore_app2:
{"client_ip": "183.93.165.82", "db_name": "db-01", "db_type": "MySQL", "db_version": "5.5", "fail": 0, "region": "cn-shanghai", "check_rows": 1072, "user_name": "user-2"}


数据加工语法

数据加工的创建流程参考创建数据加工任务

# 1.原始日志切分出schema_id和日志内容raw_content
e_set("split_array", str_partition(v("content"), "|"))
e_set("schema_id", lst_get(v("split_array"), 0))
e_set("raw_content", lst_get(v("split_array"), 2))
# 2.根据schema_id从OSS读取对应的schema内容
e_set(
    "schema",
    dct_get(
        res_oss_file(
            endpoint="http://oss-cn-hangzhou.aliyuncs.com",
            ak_id=res_local("AK_ID"),
            ak_key=res_local("AK_KEY"),
            bucket="ali-licheng-demo",
            file="schema_lib/schema.json",
            change_detect_interval=20,
        ),
        v("schema_id"),
    ),
)
# 3.从schema中读取字段名列表fields和分发的目标Logstore
e_set("fields", dct_get(v("schema"), "fields"))
e_set("target_logstore", dct_get(v("schema"), "logstore"))
# 丢弃多余字段
e_keep_fields("raw_content", "fields", "target_logstore", F_TIME, F_META)
# 4.解析分隔符日志,并映射到fields中的字段上
e_psv("raw_content", json_parse(v("fields")))
# 丢弃多余字段
e_drop_fields("fields", "raw_content")
# 5.根据schema中定义的logstore名,动态分发
e_output(project="licheng-simulator-test", logstore=v("target_logstore"))


上述加工语法的加工总体流程如下:

1.将原始日志切分出schema_id和日志内容raw_content,即:

原始日志:
content: schema_app1|113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0
切分为:
schema_id: schema_app1
raw_content: 113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0


2.根据schema_id从OSS读取对应的schema内容

3. 从schema中读取字段名列表fields和分发的目标Logstore

  • 每个schema中都定义了该schema日志的字段名列表以及分发的目标Logstore名

4.解析分隔符日志,并映射到fields中的字段上

5.根据schema中定义的分发logstore名,实现日志的动态分发。


加工后的结果示例

SLS数据加工——动态解析与分发日志实战


后续维护

后续维护过程中,如果APP日志的Schema发生变化,或者有新的APP日志进来,只需在OSS中的schema库文件中修改和增加对应APP的schema定义即可,无需对加工任务做任何修改。

上一篇:SLS数据实验室与数据模拟器


下一篇:使用Terraform玩转SLS日志审计自动化部署