Ingest pipelines—Elastic Stack 实战手册

Ingest pipelines—Elastic Stack 实战手册

· 更多精彩内容,请下载阅读全本《Elastic Stack实战手册》

· 加入创作人行列,一起交流碰撞,参与技术圈年度盛事吧

创作人:李增胜

Elastic 提供了三种方式进行数据加工处理:Logstash、Beats Processors 以及 Ingest Pipeline,本文着重介绍 Ingest Pipeline,以下比较了 Logstash 与 Ingest Pipeline的一些区别,便于在实际业务场景中选择:

种类 部署 数据缓冲 数据处理 数据源
Logstash 需要另外部署,增加复杂性 采用队列机制缓冲数据,多队列支持 支持大量processors,远超 ingest 支持外部数据源,如MYSQL、Kafka、Beats等
Ingest pipeline 无需另外部署,易于扩展 无缓冲策略 支持超过30种processors Ingest 也可和 Beats 或者 Logstash 解决特定场景数据源问题

总结:

  • 如果业务场景 Ingest pipeline 已经能处理完成,则无需使用 Logstash ,相反,如果业务处理数据场景要支持外部数据源,则选择 Logstash
  • 如果业务场景需要缓冲数据,则采用 Logstash 较优
  • 如果数据处理完成后需要输出到非 Elasticsearch 内部,则采用 Logstash
  • 在简化配置方便,如果想配置简单,则选择 Elasticsearch ingest pipeline 即可

显然,Ingest pipeline 并非 Logstatsh 的替代品,需要根据自己的业务处理数据的要求和架构设计来选择对应的技术,并非二选一,也可以同时使用,对处理不同数据采用不同的技术架构。

Kibana Dev Tools 管理 Pipeline

Ingest Pipeline

用于预处理数据,由 Elasticsearch Ingest Node 节点负责运行处理,如需要系统性能提升可单独部署 Ingest Node 节点

优点:

  • 由 Ingest Node 节点负责处理,职责清晰
  • 更多 Processors 支持,扩展性强
  • 轻量级,覆盖了 Logstash 大多常用场景

Ingest Pipeline 是一系列处理管道,由一系列的 Processors 组成处理,先来看下 pipeline 的处理过程:

Ingest pipelines—Elastic Stack 实战手册

在 Kibana 中也可以创建 Ingest pipeline,在稍微章节给出示例。

常用 的 Processors 如下

更多 Pipeline Processors 参考更多;https://www.elastic.co/guide/en/elasticsearch/reference/master/processors.html

Trim

去除空格,如果是字符串类型的数组,数组中所有字符串都会被替换空格处理

Split

切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型

Rename

重命名字段

Foreach

对一组数据进行相同的预处理,可以使用 Foreach

Lowercase / Uppercase

对字段进行大小写转换

Script

使用脚本语言进行数据预处理

Gsub

对字符串进行替换

Append

添加数据到数组

Set

设置字段值

Remove

移除字段

Trim

去除字符串中的空格

PUT _ingest/pipeline/trim_pipeline
{
  "processors": [
    {
      "foreach": {
        "field": "message",
        "processor": {
          "trim": {
            "field": "_ingest._value"
          }
        }
      }
    }
  ]
}

POST _ingest/pipeline/trim_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "car222 ",
          " auto2222 "
        ]
      }
    }
  ]
}

#返回:
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            "car222",
            "auto2222"
          ]
        },
        "_ingest" : {
          "_value" : null,
          "timestamp" : "2021-04-28T13:19:13.542743Z"
        }
      }
    }
  ]
}

Split / Foreach

切分字符串,使用指定切分符,切分字符串为数组结构,只作用于字符串类型

PUT _ingest/pipeline/split_pipeline
{
  "processors": [
    {
      "foreach": {
        "field": "message",
        "processor": {
          "split": {
            "field": "_ingest._value",
            "separator": " "
          }
        }
      }
    }
  ]
}

#测试
POST _ingest/pipeline/split_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "car222 aaa",
          " auto2222 aaaa bbb"
        ]
      }
    }
  ]
}
#返回,可以看到 message 按照空格切分为了多个字符串数组
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            [
              "car222",
              "aaa"
            ],
            [
              "",
              "auto2222",
              "aaaa",
              "bbb"
            ]
          ]
        },
        "_ingest" : {
          "_value" : null,
          "timestamp" : "2021-04-28T13:28:20.762312Z"
        }
      }
    }
  ]
}

Rename

重命名一个字段, rename 往往和 reindex 结合使用

POST goods_info_comment_message/_bulk
{"index":{"_id":1}}
{"message":"美 国苹果 "}
{"index":{"_id":2}}
{"message":"山东 苹果 "}


#定义 rename_pipeline
PUT _ingest/pipeline/rename_pipeline
{
  "processors": [
    {
      "rename": {
        "field": "message",
        "target_field": "message_new"
      }
    }
  ]
}

#重建 index
POST _reindex
{
  "source": {
    "index": "goods_info_comment_message"
  },
  "dest": {
    "index": "goods_info_comment_message_new",
    "pipeline": "rename_pipeline"
  }
}

#查询 mapping
GET goods_info_comment_message_new/_mapping

#返回
{
  "goods_info_comment_message_new" : {
    "mappings" : {
      "properties" : {
        "message_new" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

Lowercase / Uppercase

将字符串修改为大写或者小写

PUT _ingest/pipeline/lowercase_pipeline
{
  "description": "lowercase processor",
  "processors": [
    {
      "lowercase": {
        "field": "message"
      }
    }
  ]
}

#测试,部分字符大写
POST _ingest/pipeline/lowercase_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "CAr222 aaa",
          " auto2222 aaaa Bbb"
        ]
      }
    }
  ]
}

#结果,全部输出为小写
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            "car222 aaa",
            " auto2222 aaaa bbb"
          ]
        },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:12:10.041308Z"
        }
      }
    }
  ]
}

Remove

移除已经存在的字段

#定义remove pipelint
PUT _ingest/pipeline/remove_pipeline
{
  "description": "remove processor",
  "processors": [
    {
      "remove": {
        "field": "message"
      }
    }
  ]
}

#测试
POST _ingest/pipeline/remove_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "CAr222 aaa",
          " auto2222 aaaa Bbb"
        ]
      }
    }
  ]
}
#返回,可以看到message字段已经被移除
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : { },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:15:27.811516Z"
        }
      }
    }
  ]
}

Set

给已有字段进行赋值

PUT _ingest/pipeline/set_pipeline
{
  "description": "set processor",
  "processors": [
    {
      "set": {
        "field": "message",
        "value": "this is a new message"
      }
    }
  ]
}


POST _ingest/pipeline/set_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "this"
      }
    }
  ]
}

#返回
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : "this is a new message"
        },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:21:28.928512Z"
        }
      }
    }
  ]
}

Kibana Dev Tools 管理 Pipeline

下面介绍如何在 kibana 中通过界面来创建 Pipeline,打开 Kibana 首页:

Ingest pipelines—Elastic Stack 实战手册

选择 Ingest Node Pipelines,右边会展示已有的 Pipeline 列表

Ingest pipelines—Elastic Stack 实战手册

选择新创建 Ppipeline

Ingest pipelines—Elastic Stack 实战手册

Ingest pipelines—Elastic Stack 实战手册

我们选择创建一个 lowercase processor

Ingest pipelines—Elastic Stack 实战手册

点击 Add documents 进行相关测试

Ingest pipelines—Elastic Stack 实战手册

添加测试文档:

[
  {
    "_index": "index_lowercase",
    "_id": "1",
    "_source": {
      "message": "This is a Test"
    }
  }
]

Ingest pipelines—Elastic Stack 实战手册

可以看到,测试成功,字符串全部变为了小写

Ingest pipelines—Elastic Stack 实战手册

上一篇:Search template — Elastic Stack 实战手册


下一篇:Dynamic mapping — Elastic Stack 实战手册