1, 版本匹配:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/requirements.html
2, maven集成:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html
3, pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xiaoniubigdata</artifactId>
<groupId>com.wenbronk</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.</modelVersion> <artifactId>hadoop03-es</artifactId> <properties>
<hadoop.version>2.7.</hadoop.version>
<hive.version>1.2.</hive.version>
<eslasticsearch.version>6.3.</eslasticsearch.version>
</properties> <dependencies>
<!--<dependency>-->
<!--<groupId>org.elasticsearch</groupId>-->
<!--<artifactId>elasticsearch-hadoop</artifactId>-->
<!--<version>6.3.</version>-->
<!--</dependency>--> <dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop-mr</artifactId>
<version>${eslasticsearch.version}</version>
</dependency> <!--<dependency>-->
<!--<groupId>org.elasticsearch</groupId>-->
<!--<artifactId>elasticsearch-hadoop-hive</artifactId>-->
<!--<version>${eslasticsearch.version}</version>-->
<!--</dependency>--> <!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>${hadoop.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archives</artifactId>
<version>${hadoop.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<!--<scope>provided</scope>-->
</dependency> <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<!--<scope>provided</scope>-->
</dependency> <dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hive.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
<version>${hive.version}</version>
<!--<scope>provided</scope>-->
</dependency> <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.1-jre</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.</version>
</dependency> </dependencies> <build>
<plugins> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!--<minimizeJar>true</minimizeJar>-->
<createDependencyReducedPom>false</createDependencyReducedPom>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>shadowing.com.google.common</shadedPattern>
</relocation>
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>shadowing.io.netty</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build> </project>
4, 从es中读取数据:
mainjob:
package com.wenbronk.mr.es.read; import com.wenbronk.mr.es.rw.ESRWJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsInputFormat; public class ESReadJob extends Configured implements Tool { @Override
public int run(String[] strings) throws Exception {
GenericOptionsParser parser = new GenericOptionsParser(strings);
if (parser.getCommandLine() == null) {
throw new RuntimeException("args can not be null");
} Job job = Job.getInstance(getConf());
job.setJobName("es-hadoop-read");
job.setJarByClass(ESRWJob.class); job.setInputFormatClass(EsInputFormat.class); job.setMapperClass(ESReadMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class); job.setReducerClass(ESReaderReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class); Path outPath = new Path("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/hadoop03-es/target/out");
FileSystem fileSystem = FileSystem.get(getConf());
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath); return job.waitForCompletion(true) ? : -;
} public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); conf.set("es.nodes", "10.124.147.22:9200");
// conf.set("es.resource", "macsearch_fileds/mac");
conf.set("es.resource.read", "macsearch_fileds/mac");
conf.set("es.resource.write", "sink/group");
conf.set("es.query", "?q=me*");
// 设置读入格式为 json, map 的 inputvalue 为 text
// conf.set("es.output.json", "true"); conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); int result = ToolRunner.run(conf, new ESReadJob(), args);
System.exit(result);
}
}
2, mapper
package com.wenbronk.mr.es.read; import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ESReadMapper extends Mapper<Text, MapWritable, Text, MapWritable> { private Text text;
private NullWritable nullWritable; @Override
protected void setup(Context context) throws IOException, InterruptedException {
this.text = new Text();
this.nullWritable = NullWritable.get();
} // docId, source
@Override
protected void map(Text key, MapWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
reducer
package com.wenbronk.mr.es.read; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class ESReaderReducer extends Reducer<Text, MapWritable, NullWritable, Text> { private Text text;
private NullWritable nullWritable;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
this.text = new Text();
this.nullWritable = NullWritable.get();
} @Override
protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { for (MapWritable value : values) {
JSONObject jsonObject = new JSONObject();
value.entrySet().forEach(entry -> {
jsonObject.put(String.valueOf(entry.getKey()), entry.getValue());
});
text.set(jsonObject.toString());
context.write(nullWritable, text);
}
}
}
如果需要更改json格式, 可见:
git@gitlab.com:wenbronk/xiaoniubigdata.git
5, 写入es中
json格式写入,
mainjob
package com.wenbronk.mr.es.writeJson; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsOutputFormat; public class ESWriteJobWriteJson extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
GenericOptionsParser parser = new GenericOptionsParser(strings);
if (parser.getCommandLine() == null) {
throw new RuntimeException("args can not be null");
} Job job = Job.getInstance(getConf());
job.setJobName("es-hadoop-write");
job.setJarByClass(ESWriteJobWriteJson.class); job.setMapperClass(ESWriterMapperJson.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class); job.setReducerClass(ESWriterReducerJson.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class); job.setOutputFormatClass(EsOutputFormat.class); Path inPath = new Path("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/hadoop03-es/target/out/part-r-00000");
FileInputFormat.setInputPaths(job, inPath); return job.waitForCompletion(true) ? : -;
} public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.set("es.nodes", "10.124.147.22:9200");
// conf.set("es.resource", "macsearch_fileds/mac");
conf.set("es.resource.read", "macsearch_fileds/mac");
conf.set("es.resource.write", "sink/group");
conf.set("es.query", "?q=me*");
// 设置读入格式为 json, map 的 inputvalue 为 text
conf.set("es.input.json", "true"); int result = ToolRunner.run(conf, new ESWriteJobWriteJson(), args);
System.exit(result);
}
}
mapper
package com.wenbronk.mr.es.writeJson; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ESWriterMapperJson extends Mapper<LongWritable, Text, NullWritable, Text> { private NullWritable nullWritable; @Override
protected void setup(Context context) throws IOException, InterruptedException {
this.nullWritable = NullWritable.get();
} @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(nullWritable, value);
}
}
reducer
package com.wenbronk.mr.es.writeJson; import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException;
import java.util.Map; public class ESWriterReducerJson extends Reducer<NullWritable, Text, NullWritable, Text> { private NullWritable nullWritable;
// private BytesWritable bytesWritable; @Override
protected void setup(Context context) throws IOException, InterruptedException {
this.nullWritable = NullWritable.get();
// this.bytesWritable = new BytesWritable();
} @Override
protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
JSONObject jsonObject = new JSONObject();
for (Text value : values) {
context.write(nullWritable, value);
}
}
}
6 从一个index读取写入另一个index
mainjob
package com.wenbronk.mr.es.rw; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat; public class ESRWJob extends Configured implements Tool { @Override
public int run(String[] strings) throws Exception { GenericOptionsParser parser = new GenericOptionsParser(strings);
if (parser.getCommandLine() == null) {
throw new RuntimeException("args can not be null");
} Job job = Job.getInstance(getConf());
job.setJobName("es-hadoop-write");
job.setJarByClass(ESRWJob.class); job.setInputFormatClass(EsInputFormat.class); job.setMapperClass(EsRWMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class); job.setReducerClass(EsRWReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(MapWritable.class); job.setOutputFormatClass(EsOutputFormat.class);
job.setNumReduceTasks(); return job.waitForCompletion(true) ? : -;
} public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// conf.setSpeculativeExecution(false);
// 设置用户目录优先
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
// 设置es, 给定集群中一个节点的名字即可
conf.set("es.nodes", "10.124.147.22:9200");
// index/type, 可设置 es。resource.wirte/read, 设置为只读或者只写
// conf.set("es.resource", "macsearch_fileds/mac");
conf.set("es.resource.read", "macsearch_fileds/mac");
conf.set("es.resource.write", "sink/group");
conf.set("es.query", "?q=me*"); conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); int result = ToolRunner.run(conf, new ESRWJob(), args);
System.exit(result);
}
}
mapper
package com.wenbronk.mr.es.rw; import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class EsRWMapper extends Mapper<Text, MapWritable, Text, MapWritable> { @Override
protected void setup(Context context) throws IOException, InterruptedException { } @Override
protected void map(Text key, MapWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
reducer
package com.wenbronk.mr.es.rw; import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class EsRWReducer extends Reducer<Text, MapWritable, NullWritable, MapWritable> { private NullWritable nullWritable;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
this.nullWritable = NullWritable.get();
} @Override
protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {
for (MapWritable value : values) {
context.write(nullWritable, value);
}
}
}