Elasticsearch[实战四]

目录

kibana mapping设置

店铺的Mapping

如何将MySQL数据库中的数据同步至Elasticsearch中?

Pipeline 的配置举例

实践同步 


kibana mapping设置

PUT twitter1
{
  "mappings": {
    "properties": {
      "location": {
        "type": "geo_point"
      },
      "user": {
        "type": "keyword"
      }
    }
  }
}

店铺的Mapping

{
  "properties": {
    "id": {
      "type": "keyword"
    },
    "storeName": {
      "type": "text",
      "fields" : {
        "keyword" : {
          "type" : "keyword",
          "ignore_above" : 256
        }
      }
    },
    "storeIntroduction": {
      "type": "text"
    },
    "storeBrand": {
      "type": "text"
    },
    "openDate": {
      "type": "date"
    },
    "storePhoto": {
      "type": "keyword"
    },
    "storeTags": {
      "type": "text"
    }
  }
}

/**最开始的是id字段类型是keyword,说明通过关键字进行搜索。

storeName字段定义为text类型是支持全文索引的,同时通过fields定义keyword类型说明也可以做关键字查询。

StoreIntroduction字段定义为text类型。

storeBrand字段定义为text类型。

openDate字段定义为date类型。

storePhoto字段定义为keyword类型。

StoreTags字段定义为text类型。*/

如何将MySQL数据库中的数据同步至Elasticsearch中?

logstash在程序中的位置

Elasticsearch[实战四]

如下所示:Logstash左边是数据源,例如报表、结构化数据、统计数据等等,通过Logstash以后 会将这些数据进行分析、归档、监控、告警。这里我们可以将其理解为ETL的工具,它可以面向多个数据源,将数据拉入Logstash进行处理之后,最后将其输出成你想要个格式。

Elasticsearch[实战四]

如下所示:从左往右看,最左边就是我们所说的数据源,在本例中的数据源就是MySQL数据库,中间灰色的部分就是Logstatsh其包括三个部分:Input、Filter和Output。这三个部分都包含在Pipeline的管道中。这个三个部分会对输入的数据源进行处理然后生成最终的Elasticsearch索引文件。

 Elasticsearch[实战四]

  • Pipeline:包括input-filter-output的3阶段的处理流程,它是通过队列进行管理的。

  • Logstash Event:它是数据信息在Logstash中流转的形式,数据源的数据通过input进入到Logstash以后就会被转化为Logstash Event,这个Event会经过filter过滤最终event会通过output输出到目标数据源,也就是Elasticsearch中。

左边的Raw Data作为原始的数据源文件通过Input进入到Logstash中,此时会使用codec-decode对数据源进行转换,转化为Logstash Event以后经过Filter进行过滤的操作,然后还是以Logstash Event的形式通过Output中的codec-encode模块将其转换成最终的Data,并且输出。 Elasticsearch[实战四]

Pipeline 的配置举例

mysql.conf 

input {
	jdbc {
	   jdbc_driver_library => ""
	   jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
       jdbc_connection_string => "jdbc:mysql://192.168.56.10:3306/elasticsearch"
       jdbc_user => "root"
       jdbc_password => "root"
	   
	   statement => "SELECT * FROM t_goods_store"
	   use_column_value => true
	   tracking_column_type => "numeric"
	   tracking_column => "id"
	   last_run_metadata_path => "syncpoint_table"      
    }    
}

filter {

	ruby{
			code => "event.set('openDate', event.get('open_date').time.localtime.strftime('%Y-%m-%d'))"		
	}

	ruby{
			code => "event.set('storeName',event.get('store_name'));
						event.set('storeIntroduction',event.get('store_introduction'));
						event.set('storeBrand',event.get('store_brand'));
						event.set('storePhoto',event.get('store_photo'));
						event.set('storeTags',event.get('store_tags'));"
	}
	
	mutate {
		remove_field => ["tags","@version","@timestamp","open_date","store_name","store_introduction","store_brand","store_photo","store_tags"]
	}
}

output {
	elasticsearch{
		hosts => ["192.168.56.10:9200"]
		index => "store_index"
		document_id => "%{id}"
	}
	stdout {
		codec => json_lines
	}
}

在input的部分需要获取数据源的数据信息,在店铺搜索案例中,需要通过MySQL数据库获取店铺的信息。因此在Input节点中需要对MySQL进行操作,而连接数据库是该操作中最主要的动作,而这一操作主要都在jdbc节点下完成。

其主要是完成对jdbc相关配置,包括如下:

  • jdbc_driver_library:定义连接MySQL数据库驱动的jar包。

  • jdbc_driver_class:定义MySQL数据库的驱动类。

  • jdbc_connection_string:数据库连接串。

  • jdbc_user:访问数据库用户名。

  • jdbc_password :访问数据库密码。

  • Statement:定义需要获取数据的SQL语句。

  • use_column_value:使用递增列的值

  • tracking_column_type:递增字段的类型,numeric表示数值类型,

  • tracking_column:递增字段的名称

  • last_run_metadata_path :同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改

Input将数据载入到Logstash之后就轮到Filter过滤器上场了,filter节点中通过ruby定义的过滤插件对数据处理进行定义。如图2 所示,

  • Ruby的第一段代码将open_date字段中保存的日期类型数据进行了格式化。

  • Ruby的第二段代码将MySQL中字段的命名转换为驼峰命名的方式。

  • Mutate节点将会保留过滤器中的字段命名方式,而移除MySQL中字段的命名方式。

完成input和filter的配置,最后来看Output的配置。由于输出的数据目标是Elasticsearch的索引文件,因此需要加入Elasticsearch的节点:

  • Hosts:对Elasticsearch的IP和端口进行定义。

  • Index:对要保存的Elasticsearch索引进行定义。

  • Document_id:由于索引中一个document id就是一条记录,因此这里使用了自增的id变量作为document id的值。

  • Stdout:标准输出的格式以JSON格式为主。

实践同步 

下载logstach, 并在logstash-7.9.3/logstash-core/lib/jars目录下放入mysql-connector-java-8.0.22.jar即可使用jdbc链接数据库,而且在input中也不必指定jdbc_driver_library。
之后执行命令

bin/logstash -f config/mysql.conf

 之后就是改造es中的分页查询变为从ES中查询。

Elasticsearch[实战四]

​​​​​​​​​​​​​​https://codechina.csdn.net/JunMa_First/es.git  代码地址
 

学自:

Elasticsearch[实战四]

上一篇:利用logstash 迁移mysql千万级数据到 es


下一篇:Logstash详解