package com.da.hbase.tool.utils;
import com.da.hbase.tool.common.Const;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* hdfs操作常用方法类
*/
public class HdfsUtils {
public static final Logger LOG= LoggerFactory.getLogger(HdfsUtils.class);
/**
* 通过ip直接连接hdfs
* @param ip
* @return
*/
public static FileSystem getFsFromIp(String ip){
FileSystem fs = null;
try {
fs=FileSystem.get(URI.create("hdfs://"+ip),new Configuration());
} catch (IOException e) {
LOG.error("此ip:{} 连接出现异常", ip);
}
return fs;
}
/**
* 检查该fs是否可用
* @param fs
* @return
*/
public static Boolean checkFs(FileSystem fs){
Boolean success=true;
if(null==fs){
return false;
}
Path path=new Path("/");
try {
RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
success=true;
} catch (IOException e) {
success=false;
}
return success;
}
/**
* 从ips中获取一个可用的fs
* @param ips
* @return
*/
public static FileSystem getAndCheckFs(String ips){
return getAndCheckFs(ips,",");
}
/**
* 从ips中获取一个可用的fs
* @param ips
* @param separator
* @return
*/
public static FileSystem getAndCheckFs(String ips,String separator){
String [] ipArr=ips.split(separator);
FileSystem fs=null;
for (String ip : ipArr) {
fs=getFsFromIp(ip);
if(checkFs(fs)){
LOG.info("此Ip:{}可连接hdfs",ip);
break;
}else{
fs=null;
}
}
if(null==fs){
LOG.error("无法连接hdfs环境,请检查网络是否可用或者ip配置是否正确,配置ips:{}",ips);
}
return fs;
}
/**
* 测试getAndCheckFs方法
*/
private static void testConnectFs(){
String ips="10.17.139.126,10.17.139.127,10.17.139.125";
FileSystem fs=getAndCheckFs(ips);
String path1="/hbase/data/default/";
Path path=new Path(path1);
try {
RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
while(remoteIterator.hasNext()){
System.out.println(remoteIterator.next().getPath());
}
} catch (IOException e) {
}
}
/**
* 查看当前路径是否存在
* @param fs
* @param path
* @return
*/
public static Boolean checkPathExist(FileSystem fs,String path){
Boolean isExist=true;
try {
isExist=fs.exists(new Path(path));
} catch (IOException e) {
isExist=false;
e.printStackTrace();
}
return isExist;
}
/**
* 递归遍历找到所有目录和文件存储在map中,文件,key:路径,value:FILE ;目录,key:路径,value:DIR
* @param fs
* @param src
*/
public static void recureScanDir(FileSystem fs,Path src, Map<Path,String> map){
try{
if(fs.isFile(src)) {
map.put(src, Const.FILE_STATUS);
}else{
map.remove(src);
RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(src);
if(!remoteIterator.hasNext()){
map.put(src, Const.DIR_STATUS);
}else {
while (remoteIterator.hasNext()){
recureScanDir(fs,remoteIterator.next().getPath(),map);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 目录从本地拷贝到hdfs上
* @param fs
* @param src
* @param dst
* @return
*/
public static Boolean copyFromLocal(FileSystem fs,Path src,Path dst){
Boolean success=true;
try {
if(fs.exists(dst)){
fs.delete(dst,true);
}
fs.copyFromLocalFile(false,true,src,dst);
success=true;
} catch (IOException e) {
success=false;
LOG.error("文件从本地拷贝到hdfs上,出现Io异常,导致拷贝文件失败,src:{},dst:{}",src,dst);
e.printStackTrace();
}
return success;
}
/**
*目录从hdfs上拷贝到本地
* @param fs
* @param src
* @param dst
* @return
*/
public static Boolean copyToLocal(FileSystem fs,Path src,Path dst){
Boolean success=true;
try {
if(new File(dst.toString()).exists()){
Utils.deletNotEmptyDir(new File(dst.toString()));
}
fs.copyToLocalFile(false, src, dst, true);
success=true;
} catch (IOException e) {
success=false;
LOG.error("文件从hdfs拷贝到本地,出现Io异常,导致拷贝文件失败");
e.printStackTrace();
}
return success;
}
private static void testCopyFileToLocal(){
String ips="10.17.139.126,10.17.139.127,10.17.139.125";
FileSystem fs=getAndCheckFs(ips);
String path1="/hbase/data/default/";
Path path=new Path(path1);
try {
RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
while(remoteIterator.hasNext()){
System.out.println(remoteIterator.next().getPath());
}
} catch (IOException e) {
LOG.error(e.getMessage());
}
}
/**
* 获取目录path下所有的文件名
* @param fs
* @param path
* @return
*/
public static List<String> scanDir(FileSystem fs,Path path){
List<String> list=new ArrayList<>();
try {
RemoteIterator<FileStatus> remoteIterator= fs.listStatusIterator(path);
while(remoteIterator.hasNext()){
list.add(remoteIterator.next().getPath().getName());
}
} catch (IOException e) {
LOG.error(e.getMessage());
}
return list;
}
public static void main(String[] args) {
//testConnectFs();
testCopyFileToLocal();
}
}