Elasticsearch是一个基于Lucene的分布式搜索引擎,具有分布式、全文检索、近实时搜索和分析、高可用、模式*、RESTFul API等诸多优点,在实时搜索、日志处理(ELK)、大数据分析等领域有着广泛的应用。Hadoop是一个由Apache基金会所开发的分布式系统基础架构,核心组件有HDFS和MapReduce,分别提供海量数据存储和海量数据计算。
图1 ES-Hadoop简介Elasticsearch for Apache Hadoop是一个用于Elasticsearch和Hadoop进行交互的开源独立库,简称ES-Hadoop,在Hadoop和Elasticsearch之间起到桥梁的作用,完美地把Hadoop的批处理优势和Elasticsearch强大的全文检索引擎结合起来。
ES-Hadoop开辟了更加广阔的应用空间,通过ES-Hadoop可以索引Hadoop中的数据到Elasticsearch,充分利用其查询和聚合分析功能,也可以在Kibana中做进一步的可视化分析,同时也可以把Elasticsearch中的数据放到Hadoop生态系统中做运算,ES-Hadoop支持Spark、Spark、 Streaming、SparkSQL,除此之外,不论是使用Hive、 Pig、Storm、Cascading还是运行单独的Map/Reduce,通过ES-Hadoop提供的接口都支持从Elasticsearch中进行索引和查询操作。
本文基于阿里云E-MapReduce和阿里云Elasticsearch,演示如何通过ES-Hadoop连通Hadoop生态系统和Elasticsearch。
一、云服务配置
1.1 开通专有网络VPC
因为在公网访问推送数据安全性较差,为保证阿里云Elasticsearch访问环境安全,购买阿里云ES产品,对应区域下必须要有 VPC 和 虚拟交换机,因此首先开通VPC专有网络。按路径:阿里云首页-->产品-->网络->专有网络VPC,然后选择立即开通,进入到管理控制台界面,新建专有网络。
创建完成之后在控制台中可以进行管理: 图4 专有网络管理
更多关于专有网络VPC的文档参考这里:专有网络 VPC
1.2 开通阿里云Elasticsearch
按路径:阿里云首页-->产品-->数据库-->Elasticsearch或阿里云首页-->产品-->大数据基础服务-->Elasticsearch进入到阿里云Elasticsearch产品界面,新用户可以免费试用30天。
进入到购买入口,阿里云Elasticsearch提供了按月和按量两种付费模式,选择已经创建的专有网络并设置登录密码。
图5 阿里云Elasticsearch选购配置购买成功后,按路径:控制台-->大数据(数加)-->Elasticsearch,可以看到新创建的Elasticsearch集群实例。 图6 阿里云Elasticsearch实例列表页
点击"管理"菜单,进入集群管理界面:
图7 阿里云Elasticsearch集群管理点击"Kibana控制台"按钮即可进入到Kibana操作界面:
点击"集群监控"按钮进入到监控界面:
1.3 开通阿里云E-MapReduce
按路径:阿里云首页-->产品-->大数据基础服务-->E-MapReduce,之后进入到购买界面:
图10 阿里云E-MapReduce软件配置下一步进行付费配置、网络配置和节点硬件配置: 图11 阿里云E-MapReduce硬件配置
最后设置集群名称、日志路径和集群登录密码: 图12 阿里云E-MapReduce基础配置
其中日志路径存储在OSS之上,如果没有创建bucket,需要到OSS管理控制台创建新的bucket。bucket的区域要和EMR集群一直,EMR集群为华东1区,这里的bucket区域也选择华东1区:
图13 阿里云OSS创建bucketbucket创建完成以后就可以新建目录、上传文件:
最后确定,完成EMR集群的创建:
图15 阿里云E-MapReduce确认页集群创建成功后在集群列表中查看:
图16 阿里云E-MapReduce集群列表
点击管理,可以查看master节点和data节点的详细信息: 图17 阿里云E-MapReduce集群详细信息
公网IP可以直接访问,远程登录:
ssh root@你的公网IP
使用jps命令查看后台进程:
[root@emr-header-1 ~]# jps
16640 Bootstrap
17988 RunJar
19140 HistoryServer
18981 WebAppProxyServer
14023 Jps
15949 gateway.jar
16621 ZeppelinServer
1133 EmrAgent
15119 RunJar
17519 ResourceManager
1871 Application
19316 JobHistoryServer
1077 WatchDog
17237 SecondaryNameNode
16502 NameNode
16988 ApacheDsTanukiWrapper
18429 ApplicationHistoryServer
二、编写EMR写数据到ES的MR作业
推荐使用maven来进行项目管理,首先创建一个maven工程。操作步骤如下:
1.安装 Maven。首先确保计算机已经正确安装安装maven
2.生成工程框架。在工程根目录处执行如下命令:
mvn archetype:generate -DgroupId=com.aliyun.emrtoes -DartifactId=emrtoes -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
mvn 会自动生成一个空的 Sample 工程,工程名为emrtoes(和指定的artifactId一致),里面包含一个简单的 pom.xml 和 App 类(类的包路径和指定的 groupId 一致)
3.加入 Hadoop 和ES-Hadoop依赖。使用任意 IDE 打开这个工程,编辑 pom.xml 文件。在 dependencies 内添加如下内容:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop-mr</artifactId>
<version>5.5.3</version>
</dependency>
4.添加打包插件。由于使用了第三方库,需要把第三方库打包到jar文件中,在pom.xml中添加maven-assembly-plugin插件的坐标:
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.aliyun.emrtoes.EmrToES</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
5.编写代码。在com.aliyun.emrtoes包下和 App 类平行的位置添加新类 EmrToES.java。内容如下:
package com.aliyun.emrtoes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import java.io.IOException;
public class EmrToES {
public static class MyMapper extends Mapper<Object, Text, NullWritable, Text> {
private Text line = new Text();
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
if (value.getLength() > 0) {
line.set(value);
context.write(NullWritable.get(), line);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//阿里云 Elasticsearch X-PACK用户名和密码
conf.set("es.net.http.auth.user", "你的X-PACK用户名");
conf.set("es.net.http.auth.pass", "你的X-PACK密码");
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.set("es.nodes", "你的Elasticsearch内网地址");
conf.set("es.port", "9200");
conf.set("es.nodes.wan.only", "true");
conf.set("es.resource", "blog/yunqi");
conf.set("es.mapping.id", "id");
conf.set("es.input.json", "yes");
Job job = Job.getInstance(conf, "EmrToES");
job.setJarByClass(EmrToES.class);
job.setMapperClass(MyMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
6.编译并打包。在工程的目录下,执行如下命令:
mvn clean package
执行完毕以后,可在工程目录的 target 目录下看到一个emrtoes-1.0-SNAPSHOT-jar-with-dependencies.jar,这个就是作业 jar 包。
图18 IDEA中EMR写ES工程目录截图三、EMR中完成作业
3.1 测试数据
把下面的数据写入到blog.json中:
{"id":"1","title":"git简介","posttime":"2016-06-11","content":"svn与git的最主要区别..."}
{"id":"2","title":"ava中泛型的介绍与简单使用","posttime":"2016-06-12","content":"基本操作:CRUD ..."}
{"id":"3","title":"SQL基本操作","posttime":"2016-06-13","content":"svn与git的最主要区别..."}
{"id":"4","title":"Hibernate框架基础","posttime":"2016-06-14","content":"Hibernate框架基础..."}
{"id":"5","title":"Shell基本知识","posttime":"2016-06-15","content":"Shell是什么..."}
上传到阿里云E-MapReduce集群,使用scp远程拷贝命令上传文件:
scp blog.json root@你的弹性公网IP:/root
把blog.json上传至HDFS:
hadoop fs -mkdir /work
hadoop fs -put blog.json /work
3.2 上传JAR包
把maven工程target目录下的jar包上传至阿里云E-MapReduce集群:
scp target/emrtoes-1.0-SNAPSHOT-jar-with-dependencies.jar root@YourIP:/root
3.3 执行MR作业
hadoop jar emrtoes-1.0-SNAPSHOT-jar-with-dependencies.jar /work/blog.json
运行成功的话,控制台会输出如下图所示信息:
图19 阿里云E-MapReduce运行MR作业截图命令查询Elasticsearch中的数据:
curl -u elastic -XGET es-cn-v0h0jdp990001rta9.elasticsearch.aliyuncs.com:9200/blog/_search?pretty
图20 命令查看阿里云Elasticsearch中的数据或者在Kibana中查看: 图21 Kibana中查看阿里云Elasticsearch中的数据
四、API分析
Map过程,按行读入,input kye的类型为Object,input value的类型为Text。输出的key为NullWritable类型,NullWritable是Writable的一个特殊类,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符。MapReduce中如果不需要使用键或值,就可以将键或值声明为NullWritable,这里把输出的key设置NullWritable类型。输出为BytesWritable类型,把json字符串序列化。
因为只需要写入,没有Reduce过程。配置参数说明如下:
- conf.set("es.net.http.auth.user", "你的X-PACK用户名");
设置X-PACK的用户名 - conf.set("es.net.http.auth.pass", "你的X-PACK密码");
设置X-PACK的密码 - conf.setBoolean("mapred.map.tasks.speculative.execution", false);
关闭mapper阶段的执行推测
- conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
关闭reducer阶段的执行推测
- conf.set("es.nodes", "你的Elasticsearch内网地址");
配置Elasticsearch的IP和端口
- conf.set("es.resource", "blog/yunqi");
设置索引到Elasticsearch的索引名和类型名。
- conf.set("es.mapping.id", "id");
设置文档id,这个参数”id”是文档中的id字段
- conf.set("es.input.json", "yes");
指定输入的文件类型为json。
- job.setInputFormatClass(TextInputFormat.class);
设置输入流为文本类型
- job.setOutputFormatClass(EsOutputFormat.class);
设置输出为EsOutputFormat类型。
- job.setMapOutputKeyClass(NullWritable.class);
设置Map的输出key类型为NullWritable类型
- job.setMapOutputValueClass(BytesWritable.class);
设置Map的输出value类型为BytesWritable类型
- FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
传入HDFS上的文件路径
五、总结
首先,总结一下实践过程中遇到的问题:
1.ClassNotFoundException异常
遇到找不到EsOutputFormat所在类,导致的ClassNotFoundException异常:
java.lang.NoClassDefFoundError: org/elasticsearch/hadoop/mr/EsOutputFormat
解决办法,使用maven-assembly-plugin插件,把第三方库打到jar包中。
2. 连接不到Elasticsearch集群
连接不到Elasticsearch集群的第一个原因是没有配置X-PACK 的用户名和密码,加上以下两行配置:
conf.set("es.net.http.auth.user", "你的X-PACK用户名");
conf.set("es.net.http.auth.pass", "你的X-PACK密码");
第二个原因就是EMR集群和Elasticsearch集群网络不通,在创建集群的时尽量选择同一区域,比如EMR集群在华东1区,Elasticsearch集群也在华东1区,事先用Ping命令测试。
第三个原因是端口,一般TCP端口(比如使用Java客户端)是9300,ES-Hadoop中使用的仍然是9200端口.
3.Reduce过程中格式错误
注意测试文件中每一行都是一个JSON,在设置中加上:
conf.set("es.input.json", "yes");
否则会出现解析文件格式异常。
最后,ES-Hadoop连通了Hadoop和Elasticsearch两个大数据生态圈,本博客做了写数据的实践案例,更多资料请参考阿里云E-MapReduce 阿里云Elasticsearch