使用HDFS客户端java api读取hadoop集群上的信息

本文介绍使用hdfs java api的配置方法。

1、先解决依赖,pom

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
<scope>provided</scope>
</dependency>

2、配置文件,存放hdfs集群配置信息,基本都是来源于core-site.xml和hdfs-site.xml,可以根据hdfs集群client端配置文件里的信息进行填写

#============== hadoop ===================
hdfs.fs.defaultFS=hdfs://mycluster-tj
hdfs.ha.zookeeper.quorum=XXXX-apache00.XX01,XXXX-apache01.XX01,XXXX-apache02.XX01
hdfs.dfs.nameservices=XXXX
hdfs.dfs.ha.namenodes.mycluster-tj=XX1,XX2
hdfs.dfs.namenode.rpc-address.mycluster-tj.nn1=XXXX-apachenn01.XX01:
hdfs.dfs.namenode.rpc-address.mycluster-tj.nn2=XXXX-apachenn02.XX01:

3、java client api

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat;
import java.util.Date; public class HadoopClient {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
private FileSystem fs;
private String defaultFS;
private String zKQuorum;
private String nameServices;
private String nameNodes;
private String rpcAddressNN1;
private String rpcAddressNN2; public void setDefaultFS(String defaultFS) {
this.defaultFS = defaultFS;
}
public String getDefaultFS() {
return defaultFS;
}
public void setZKQuorum(String zKQuorum) {
this.zKQuorum = zKQuorum;
}
public String getzKQuorum() {
return zKQuorum;
}
public void setNameServices(String nameServices) {
this.nameServices = nameServices;
}
public String getNameServices() {
return nameServices;
}
public void setNameNodes(String nameNodes) {
this.nameNodes = nameNodes;
}
public String getNameNodes() {
return nameNodes;
}
public void setRpcAddressNN1(String rpcAddressNN1) {
this.rpcAddressNN1 = rpcAddressNN1;
}
public String getRpcAddressNN1() {
return rpcAddressNN1;
}
public void setRpcAddressNN2(String rpcAddressNN2) {
this.rpcAddressNN2 = rpcAddressNN2;
}
public String getRpcAddressNN2() {
return rpcAddressNN2;
} public void init() {
try {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", defaultFS);
conf.set("ha.zookeeper.quorum", zKQuorum);
conf.set("dfs.nameservice", nameServices);
conf.set("dfs.ha.namenodes.mycluster-tj", nameNodes);
conf.set("dfs.namenode.rpc-address.mycluster-tj.nn1", rpcAddressNN1);
conf.set("dfs.namenode.rpc-address.mycluster-tj.nn2", rpcAddressNN2);
fs = FileSystem.get(new URI(defaultFS), conf);
} catch (Exception ex) {
ex.printStackTrace();
}
} public void stop() {
try {
fs.close();
} catch(Exception e) { }
} public boolean exists(String path) {
boolean isExists = false;
try {
Path hdfsPath = new Path(path);
isExists = fs.exists(hdfsPath);
} catch (Exception ex) {
logger.error("exists error: {}", ex.getMessage());
}
return isExists;
} public String getModificationTime(String path) throws IOException {
String modifyTime = null;
try {
Path hdfsPath = new Path(path);
FileStatus fileStatus = fs.getFileStatus(hdfsPath);
long modifyTimestamp = fileStatus.getModificationTime();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
Date date = new Date(modifyTimestamp);
modifyTime = simpleDateFormat.format(date);
} catch(Exception ex) {
logger.error("getModificationTime error: {}", ex.getMessage());
}
return modifyTime;
} }

4、configuration

import com.xiaoju.dqa.prometheus.client.hadoop.HadoopClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class HadoopConfiguration {
@Value("${hdfs.fs.defaultFS}")
private String defaultFS;
@Value("${hdfs.ha.zookeeper.quorum}")
private String zKQuorum;
@Value("${hdfs.dfs.nameservices}")
private String nameServices;
@Value("${hdfs.dfs.ha.namenodes.mycluster-tj}")
private String nameNodes;
@Value("${hdfs.dfs.namenode.rpc-address.mycluster-tj.nn1}")
private String rpcAddressNN1;
@Value("${hdfs.dfs.namenode.rpc-address.mycluster-tj.nn2}")
private String rpcAddressNN2; @Bean(initMethod = "init", destroyMethod = "stop")
public HadoopClient hadoopClient() {
HadoopClient hadoopClient = new HadoopClient();
hadoopClient.setDefaultFS(defaultFS);
hadoopClient.setZKQuorum(zKQuorum);
hadoopClient.setNameServices(nameServices);
hadoopClient.setNameNodes(nameNodes);
hadoopClient.setRpcAddressNN1(rpcAddressNN1);
hadoopClient.setRpcAddressNN2(rpcAddressNN2);
return hadoopClient;
}
}

今天被一个问题坑的要死了,回来补这篇文章。

如果你要访问的集群采用了viewfs方式管理数据,按照本文上面的方法链接集群是有问题。会导致由URI和nameservices解析成功的namenode才可以访问,而其他的访问不了!!!

如果你想解决这个问题,在api部分你要去掉URI部分和nameservices配置,直接使用集群客户端hdfs-site.xml和core-site.xml

应该是这样的。

package com.xiaoju.dqa.jazz.hadoop.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date; public class HadoopClient {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
private FileSystem fs; public void init() {
try {
Configuration conf = new Configuration();
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
conf.addResource("mount-table.xml");
fs = FileSystem.get(conf);
} catch (Exception ex) {
ex.printStackTrace();
}
} public void stop() {
try {
fs.close();
} catch(Exception e) { }
} public boolean exists(String path) {
boolean isExists = true;
try {
Path hdfsPath = new Path(path);
isExists = fs.exists(hdfsPath);
} catch (Exception e) {
logger.error("[HDFS]判断文件是否存在失败", e);
}
return isExists;
} public String getModificationTime(String path) throws IOException {
String modifyTime = null;
try {
Path hdfsPath = new Path(path);
FileStatus fileStatus = fs.getFileStatus(hdfsPath);
long modifyTimestamp = fileStatus.getModificationTime();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
Date date = new Date(modifyTimestamp);
modifyTime = simpleDateFormat.format(date);
} catch(Exception e) {
logger.error("[HDFS]获取最近修改时间失败", e);
}
return modifyTime;
} public long getPathSize(String path) throws IOException {
long size = -1L;
try {
Path hdfsPath = new Path(path);
size = fs.getContentSummary(hdfsPath).getLength();
} catch (Exception e) {
logger.error("[HDFS]获取路径大小失败", e);
}
return size; } }

config中也不需要传任何参数了

package com.xiaoju.dqa.jazz.hadoop.configuration;

import com.xiaoju.dqa.jazz.hadoop.client.HadoopClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class HadoopConfig { @Bean(initMethod = "init", destroyMethod = "stop")
public HadoopClient hadoopClient() {
HadoopClient hadoopClient = new HadoopClient();
return hadoopClient;
}
}
上一篇:JavaScript实现级联下拉框


下一篇:%1 不是有效的 Win32 应用程序