最近项目需求需要将mysql 3000W数据同步到ES中
本人利用ES官方文档中提到的logstash完成此次数据同步
input {
stdin {}
jdbc {
#mysql的jdbc连接工具jar包
jdbc_driver_library => "D:\JavaMaven\maven_jar\mysql\mysql-connector-java\8.0.25\mysql-connector-java-8.0.25.jar"
#jdbc驱动类全类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
#jdbc连接url
jdbc_connection_string => "jdbc:mysql://localhost:3306/test1?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8"
#数据库用户名
jdbc_user => "root"
#数据库密码
jdbc_password => "123456"
#数据同步时间(都是*则每一分钟同步一次)
schedule => "* * * * *"
#jdbc分页查询开启
jdbc_paging_enabled => "true"
#查询每页显示数据条数
jdbc_page_size => "50000"
#sql文件路径(就是需要同步的数据)
statement_filepath => "D:/Javaenvir/elasticsearch/logstash-7.8.0/config/jdbc.sql"
#上次更新位置标记文件路径
last_run_metadata_path => "D:/Javaenvir/elasticsearch/logstash-7.8.0/config/test-config/mms_member_company.txt"
#每次启动是否清除上一项配置文件中数据
clean_run => "false"
#开启所有字段名转成小写
lowercase_column_names => "true"
#解决中文乱码问题
codec => plain { charset => "UTF-8"}
#是否记录上次运行的记录
record_last_run => "true"
#是否使用其他字段判断进行数据库同步
use_column_value => "true"
#数据库中的增量指标字段名
tracking_column => "id"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
index => "comp"
#es服务器
hosts => "localhost:9200"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
附上logstash jdbc input接口函数
clean_run
- 值类型是布尔值
- 默认值为 false
是否应该保留之前的运行状态
columns_charset
- 值类型是哈希
- 默认值为 {}
特定列的字符编码。此选项将覆盖:charset指定列的选项。
例子:
输入{
jdbc { ...
columns_charset => { "column0" => "ISO-8859-1" } ... } }
这只会转换将 ISO-8859-1 作为原始编码的 column0。
connection_retry_attempts
- 值类型是数字
- 默认值为 1
尝试连接数据库的最大次数
connection_retry_attempts_wait_time
- 值类型是数字
- 默认值为 0.5
连接尝试之间休眠的秒数
jdbc_connection_string
- 这是必需的设置。
- 值类型是字符串
- 此设置没有默认值。
JDBC 连接字符串
jdbc_default_timezone
- 值类型是字符串
- 此设置没有默认值。
时区转换。Logstash(和 Elasticsearch)期望时间戳以 UTC 术语表示。如果您的数据库记录了相对于另一个时区的时间戳,如果您愿意,则为数据库时区,然后将此设置设置为数据库正在使用的时区。但是,由于 SQL 不允许在时间戳字段中包含时区数据,因此我们无法逐条记录地计算出这一点。这个插件会自动将你的 SQL 时间戳字段转换为 Logstash 时间戳,以 ISO8601 格式的相对 UTC 时间。
使用此设置将手动分配指定的时区偏移量,而不是使用本地机器的时区设置。例如,您必须使用规范时区America/Denver。
plugin_timezone
- 值可以是以下任何一个:utc,local
- 默认值为 “utc”
如果您希望此插件将时间戳偏移到 UTC 以外的时区,则可以将此设置设置为local,插件将使用操作系统时区进行偏移调整。
注意:当指定plugin_timezoneand/or 时jdbc_default_timezone,偏移量调整在两个地方进行,如果sql_last_value是时间戳并且它在语句中用作参数,那么偏移量调整是从插件时区到数据时区进行的,并且在处理记录时,时间戳偏移量从数据库时区调整到插件时区。如果您的数据库时区是 UTC,那么您不需要设置这些设置中的任何一个。
jdbc_driver_class
- 这是必需的设置。
- 值类型是字符串
- 此设置没有默认值。
要加载的 JDBC 驱动程序类,例如“org.apache.derby.jdbc.ClientDriver”
Java::如果尽管提供了相关的 jar,但驱动程序似乎没有正确加载,则可能需要在驱动程序类之前添加通过jdbc_driver_library设置或放置在 Logstash Java 类路径中。众所周知,Oracle JDBC 驱动程序 (ojdbc6.jar)jdbc_driver_class就是"Java::oracle.jdbc.driver.OracleDriver"这种情况,其中正确的是 ,其他 JDBC 驱动程序也可能是这种情况。
jdbc_driver_library
- 值类型是字符串
- 此设置没有默认值。
JDBC 驱动程序库到第三方驱动程序库的路径。如果需要多个库,您可以传递它们,以逗号分隔。
如果未提供,插件将在 Logstash Java 类路径中查找驱动程序类。此外,如果库似乎没有通过此设置正确加载,则将相关 jar 放在 Logstash Java 类路径中而不是通过此设置可能会有所帮助。还请确保 Logstash 进程可以读取该路径(例如logstash,作为服务运行时的用户)。
jdbc_fetch_size
- 值类型是数字
- 此设置没有默认值。
JDBC 获取大小。如果未提供,将使用各自驱动程序的默认值
jdbc_page_size
- 值类型是数字
- 默认值为 100000
JDBC 页面大小
jdbc_paging_enabled
- 值类型是布尔值
- 默认值为 false
JDBC 启用分页
这会导致一个 sql 语句被分解成多个查询。每个查询将使用限制和偏移量来共同检索完整的结果集。限制大小用 设置jdbc_page_size。
请注意,查询之间不能保证排序。
jdbc_password
- 值类型是密码
- 此设置没有默认值。
JDBC密码
jdbc_password_filepath
- 值类型是路径
- 此设置没有默认值。
JDBC 密码文件名
jdbc_pool_timeout
值类型是数字
默认值为 5
连接池配置。在引发 PoolTimeoutError 之前等待获取连接的秒数(默认为 5)
jdbc_user
- 这是必需的设置。
- 值类型是字符串
- 此设置没有默认值。
JDBC用户
jdbc_validate_connection
值类型是布尔值
默认值为 false
连接池配置。使用前验证连接。
jdbc_validation_timeout
- 值类型是数字
- 默认值为 3600
连接池配置。验证连接的频率(以秒为单位)
last_run_metadata_path
- 值类型是字符串
- 默认值为 “$HOME/.logstash_jdbc_last_run”
上次运行时间的文件路径
lowercase_column_names
- 值类型是布尔值
- 默认值为 true
是否强制标识符字段小写
parameters
- 值类型是哈希
- 默认值为 {}
查询参数的哈希值,例如 { “target_id” => “321” }
prepared_statement_bind_values
- 值类型是数组
- 默认值为 []
准备好的语句的绑定值数组。:sql_last_value是保留的预定义字符串
prepared_statement_name
- 值类型是字符串
- 默认值为 “”
为准备好的语句指定的名称。它在您的配置和数据库中必须是唯一的
record_last_run
- 值类型是布尔值
- 默认值为 true
是否保存状态 last_run_metadata_path
schedule
- 值类型是字符串
- 此设置没有默认值。
定时运行语句的时间表,以Cron格式例如:“* * * * *”(每分钟执行一次查询,每分钟执行一次)
默认情况下没有时间表。如果没有给出时间表,那么语句只运行一次。
sequel_opts
- 值类型是哈希
- 默认值为 {}
通用/特定于供应商的 Sequel 配置选项。
可选连接池配置示例 max_connections - 连接池的最大连接数
sql_log_level
- 值可以是以下任何一个:fatal, error, warn, info,debug
- 默认值为 “info”
记录 SQL 查询的日志级别,接受的值是常见的致命、错误、警告、信息和调试。默认值为信息。
statement
- 值类型是字符串
- 此设置没有默认值。
如果未定义,即使未使用编解码器,Logstash 也会抱怨。执行语句
要使用参数,请使用命名参数语法。例如:
"SELECT * FROM MYTABLE WHERE id = :target_id"
这里,":target_id" 是一个命名参数。您可以使用该parameters设置配置命名参数。
statement_filepath
- 值类型是路径
- 此设置没有默认值。
包含要执行的语句的文件路径
tracking_column
- 值类型是字符串
- 此设置没有默认值。
如果use_column_value设置为要跟踪其值的列true
tracking_column_type
- 值可以是以下任何一个:numeric,timestamp
- 默认值为 “numeric”
跟踪列的类型。目前只有“数字”和“时间戳”
use_column_value
- 值类型是布尔值
- 默认值为 false
设置为 时true,将定义的 tracking_column值用作:sql_last_value。设置为 时false,:sql_last_value反映上次执行查询的时间。
use_prepared_statements
- 值类型是布尔值
- 默认值为 false
设置为 时true,启用准备语句使用
add_field
- 值类型是哈希
- 默认值为 {}
向事件添加字段
codec
值类型是编解码器
默认值为 “plain”
用于输入数据的编解码器。输入编解码器是一种在数据进入输入之前对其进行解码的便捷方法,无需在 Logstash 管道中使用单独的过滤器。
enable_metric
- 值类型是布尔值
- 默认值为 true
默认情况下,为此特定插件实例禁用或启用指标日志记录,我们会记录所有可能的指标,但您可以禁用特定插件的指标收集。
id
- 值类型是字符串
- 此设置没有默认值。
添加一个独特ID的插件配置。如果未指定 ID,Logstash 将生成一个。强烈建议在您的配置中设置此 ID。这在您有两个或多个相同类型的插件时特别有用,例如,如果您有 2 个 jdbc 输入。在这种情况下添加命名 ID 将有助于在使用监控 API 时监控 Logstash。
输入{
jdbc {
id => "my_plugin_id" } }
id字段中的变量替换仅支持环境变量,不支持使用来自秘密存储的值。
tags
- 值类型是数组
- 此设置没有默认值。
向您的活动添加任意数量的任意标签。
这有助于以后处理。
type
- 值类型是字符串
- 此设置没有默认值。
type向此输入处理的所有事件添加一个字段。
类型主要用于过滤器激活。
该类型存储为事件本身的一部分,因此您也可以使用该类型在 Kibana 中搜索它。
如果尝试为已有类型的事件设置类型(例如,当您将事件从托运人发送到索引器时),则新输入将不会覆盖现有类型。即使发送到另一个 Logstash 服务器,在托运人处设置的类型也会在其生命周期内与该事件保持一致。