1、mysql数据库格式
2、es的安装,可以看我以前的步骤
3、下载Datax
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
4、由于Datax默认没有elasticsearchwriter,所以需要自己打jar包(注意:需要 jdk1.8; maven 3.x)
1、git clone https://github.com/alibaba/DataX.git
2、保留编译的子模块
<modules>
<module>common</module>
<module>core</module>
<module>transformer</module>
<!-- reader -->
<module>mysqlreader</module>
<!-- writer -->
<module>elasticsearchwriter</module>
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>
<module>hbase20xsqlreader</module>
<module>hbase20xsqlwriter</module>
</moudles>
3、mvn clean install -Dmaven.test.skip=true
4、复制打好的包/elasticsearchwriter/target/datax/plugin/writer/elasticsearchwriter目录到datax的plugin目录下
5、编写job.json
{ "job": { "setting": { "speed": { "channel": 1 }, "errorLimit": { "percentage": 0 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "yang156122", "connection": [{ "querySql": ["select * from user_t"], "jdbcUrl": ["jdbc:mysql://localhost:3306/test"] }] } }, "writer": { "name": "elasticsearchwriter", "parameter": { "endpoint":"http://hadoop100:9200", "accessId":"root", "accessKey":"root", "index": "mysql2es", # es index "type": "id", # es table "cleanup": false, "discovery":false, "column": [ { "name": "id", "type": "integer" }, { "name": "user_name", "type": "keyword" }, { "name": "phone_number", "type": "keyword" }, { "name": "password", "type": "keyword" }, { "name": "age", "type": "keyword" } , { "name": "sex", "type": "keyword" } ] } } } ] } }
6、创建es映射
@Test public void mysql2Es() throws IOException { //1:settings HashMap<String, Object> settings_map = new HashMap<String, Object>(2); settings_map.put("number_of_shards", 3); settings_map.put("number_of_replicas", 2); //2:mappings(映射、schema) XContentBuilder builder = XContentFactory.jsonBuilder() .startObject() .field("dynamic", "true") //设置type中的属性 .startObject("properties") //.startObject("pin") // .startObject("properties") .startObject("id") .field("type", "integer") .endObject() .startObject("user_name") .field("type", "string") .field("index", "not_analyzed") .endObject() .startObject("phone_number") .field("type", "string") .field("index", "not_analyzed") .endObject() .startObject("password") .field("type", "string") .field("index", "not_analyzed") .endObject() .startObject("age") .field("type", "string") .field("index", "not_analyzed") .endObject() .startObject("sex") .field("type", "string") .field("index", "not_analyzed") .endObject() // .endObject() // .endObject() .endObject() .endObject(); CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("mysql2es"); //管理索引(user_info)然后关联type(user) prepareCreate.setSettings(settings_map).addMapping("id", builder).get(); } }
7、运行
python ../bin/datax.py job.json
8、实验结果