阿里云ElasticSearch使用LogStash通过公网将MySQL数据导入

Step By Step

1、创建ES实例和LogStash实例
阿里云ElasticSearch使用LogStash通过公网将MySQL数据导入

2、为LogStash配置SNAT

LogStash默认仅有VPC内网环境,如果想使用公网的数据源,需要配置NAT网关,本示例是从公网MySQL读取数据,所以需要配置SNAT。
阿里云ElasticSearch使用LogStash通过公网将MySQL数据导入

3、将SNAT IP加入MySQL网络白名单
阿里云ElasticSearch使用LogStash通过公网将MySQL数据导入

4、数据库基本信息获取
阿里云ElasticSearch使用LogStash通过公网将MySQL数据导入

5、数据库中建表和插入数据

DROP TABLE IF EXISTS `doctor_advisory_price_1`;
CREATE TABLE `doctor_advisory_price_1` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `doctor_id` bigint(20) NOT NULL COMMENT '医生ID',
  `advisory_price` int(10) NOT NULL COMMENT '咨询价格:分',
  `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `gmt_modify` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=304 DEFAULT CHARSET=utf8 COMMENT='医生咨询自定义价格表';

-- ----------------------------
-- Records of doctor_advisory_price
-- ----------------------------
INSERT INTO `doctor_advisory_price_1` VALUES (1, 123456, 2000, '2018-09-05 16:34:09', '2018-09-05 16:37:44');
INSERT INTO `doctor_advisory_price_1` VALUES (2, 1823784, 100, '2018-09-11 11:25:34', '2019-07-02 15:44:24');
INSERT INTO `doctor_advisory_price_1` VALUES (3, 1000247, 0, '2018-09-11 11:41:31', '2018-12-18 17:44:54');
INSERT INTO `doctor_advisory_price_1` VALUES (4, 44612299, 100, '2018-09-11 13:55:33', '2019-12-26 15:40:27');
INSERT INTO `doctor_advisory_price_1` VALUES (5, 44612298, 300, '2018-09-11 14:33:48', '2019-01-18 14:32:31');
INSERT INTO `doctor_advisory_price_1` VALUES (6, 61823709, 10000, '2018-09-11 16:28:57', '2018-09-11 16:28:57');
INSERT INTO `doctor_advisory_price_1` VALUES (7, 1899974, 10000, '2018-09-11 16:30:03', '2018-09-11 16:30:03');
INSERT INTO `doctor_advisory_price_1` VALUES (8, 61823711, 10000, '2018-09-11 17:16:07', '2018-09-11 17:16:07');
INSERT INTO `doctor_advisory_price_1` VALUES (9, 1610524, 0, '2018-09-11 17:31:50', '2019-08-22 14:16:45');
INSERT INTO `doctor_advisory_price_1` VALUES (10, 61823712, 2500, '2018-09-11 17:32:51', '2018-09-12 11:29:54');
INSERT INTO `doctor_advisory_price_1` VALUES (11, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (12, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (13, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (14, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (15, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (16, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (17, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (18, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (19, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (20, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (21, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (22, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (23, 32321043, 10000, '2018-09-13 18:20:19', '2018-09-13 18:20:19');
INSERT INTO `doctor_advisory_price_1` VALUES (24, 62023722, 0, '2018-09-14 10:28:00', '2018-09-14 10:28:00');
INSERT INTO `doctor_advisory_price_1` VALUES (25, 49522775, 0, '2018-09-14 11:00:23', '2019-05-28 11:47:12');
INSERT INTO `doctor_advisory_price_1` VALUES (26, 50622828, 100, '2018-09-14 14:08:55', '2019-12-27 14:41:14');
INSERT INTO `doctor_advisory_price_1` VALUES (27, 31210890, 100, '2018-09-14 14:27:48', '2019-01-15 14:24:37');
INSERT INTO `doctor_advisory_price_1` VALUES (28, 45822396, 200, '2018-09-14 14:59:14', '2019-01-18 10:25:16');
INSERT INTO `doctor_advisory_price_1` VALUES (29, 47322576, 100, '2018-09-15 10:01:26', '2018-09-15 10:01:26');
INSERT INTO `doctor_advisory_price_1` VALUES (30, 50632833, 0, '2018-09-15 10:09:24', '2018-10-10 16:58:38');

6、LogStash上传插件(mysql-connector-java-8.0.18.jar)
阿里云ElasticSearch使用LogStash通过公网将MySQL数据导入

7、ElasticSearch开启允许自动创建索引
阿里云ElasticSearch使用LogStash通过公网将MySQL数据导入

8、pipeline

# input插件需要监听Logstash进程所在节点的端口,请使用8000~9000范围内的端口。
input {
jdbc {
jdbc_driver_library => "/ssd/1/share/<LogStash实例Id>/logstash/current/config/custom/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<******>.mysql.rds.aliyuncs.com:3306/******?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
jdbc_user => "******"
jdbc_password => "<密码>"
schedule => "* * * * *"
statement => "SELECT * from doctor_advisory_price_1"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "gmt_modify"
last_run_metadata_path => "/ssd/1/<logstash id>/logstash/data/doctor_advisory_price-20201210.txt"
clean_run => false
}
}
filter {

}
output {
  elasticsearch {
hosts => "http://******.elasticsearch.aliyuncs.com:9200"
user => "elastic"
password => "<密码>"
index => "doctor_test_01"
document_id => "%{doctor_id}"
}
  # 支持output中添加file_extend output配置,即可在管道部署完成后直接查看输出结果,进行结果验证与调试
  # 请勿修改系统指定路径,注释或删除file_extend output部分配置,可关闭配置调试。详情见下方提示
  # file_extend {
  #   path => "/ssd/1/<logstash id>/logstash/logs/debug/mysql_to_es3"
  # }
}

9、Kibana 查看
阿里云ElasticSearch使用LogStash通过公网将MySQL数据导入

参考链接

配置扩展文件
Logstash - 同步MYSQL数据到Elasticsearch

上一篇:阿里云机器学习PAI-ModelHub公共模型部署与Python调用示例


下一篇:阿里云微服务消息队列Token C# 设备端示例Demo