HDFS的增删查改

public class HDFS {
    //全局静态变量:hdfs的地址和端口号
    private static String HDFSUri = "hdfs://10.8.177.29:8020";
    
    /**
     * 获取文件系统
     *
     * @return FileSystem 文件系统
     */
    public static FileSystem getFileSystem() {
        //读取配置文件
        Configuration conf = new Configuration();
        // 文件系统
        FileSystem fs = null;
        String hdfsUri = HDFSUri;
        if(StringUtils.isBlank(hdfsUri)){
            // 返回默认文件系统  如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统
            try {
                fs = FileSystem.get(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }else{
            // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统
            try {
                URI uri = new URI(hdfsUri.trim());
                fs = FileSystem.get(uri,conf);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return fs;
    }
    
    
    /**
     * 创建文件目录
     *
     * @param path 文件路径
     */
    public static void mkdir(String path) {
        try {
            FileSystem fs = getFileSystem();
            System.out.println("FilePath="+path);
            // 创建目录
            fs.mkdirs(new Path(path));
            //释放资源
            fs.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    /**
     * 判断目录是否存在
     *
     * @param filePath 目录路径
     * @param create 若不存在是否创建
     */
    public static  boolean existDir(String filePath, boolean create){
        boolean flag = false;

        if (StringUtils.isEmpty(filePath)){
            return flag;
        }

        try{
            Path path = new Path(filePath);
            // FileSystem对象
            FileSystem fs = getFileSystem();

            if (create){
                if (!fs.exists(path)){
                    fs.mkdirs(path);
                }
            }

            if (fs.isDirectory(path)){
                flag = true;
            }
        }catch (Exception e){
            e.printStackTrace();
        }

        return flag;
    }
    
    
    /**
     * 本地文件上传至 HDFS
     *
     * @param srcFile 源文件 路径
     * @param destPath hdfs路径
     */
    public static void copyFileToHDFS(String srcFile,String destPath)throws Exception{


        FileInputStream fis=new FileInputStream(new File(srcFile));//读取本地文件
        Configuration config=new Configuration();
        FileSystem fs=FileSystem.get(URI.create(HDFSUri+destPath), config);
        OutputStream os=fs.create(new Path(destPath));
        //copy
        IOUtils.copyBytes(fis, os, 4096, true);
        System.out.println("拷贝完成...");
        fs.close();
    }
    
    
    /**
     * 从 HDFS 下载文件到本地
     *
     * @param srcFile HDFS文件路径
     * @param destPath 本地路径
     */
    public static void getFile(String srcFile,String destPath)throws Exception {
        //hdfs文件 地址
        String file=HDFSUri+srcFile;
        Configuration config=new Configuration();
        //构建FileSystem
        FileSystem fs = FileSystem.get(URI.create(file),config);
        //读取文件
        InputStream is=fs.open(new Path(file));
        IOUtils.copyBytes(is, new FileOutputStream(new File(destPath)),2048, true);
        //保存到本地  最后 关闭输入输出流
        System.out.println("下载完成...");
        fs.close();
    }
    
    
    /**
     * 删除文件或者文件目录
     *
     * @param path
     */
    public static void rmdir(String path) {
        try {
            // 返回FileSystem对象
            FileSystem fs = getFileSystem();

            String hdfsUri = HDFSUri;
            if(StringUtils.isNotBlank(hdfsUri)){
                path = hdfsUri + path;
            }
            System.out.println("path:"+path);
            // 删除文件或者文件目录  delete(Path f) 此方法已经弃用
            System.out.println( fs.delete(new Path(path),true));

            // 释放资源
            fs.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    /**
     * 读取文件的内容
     * @param filePath
     * @throws IOException
     */
    public static void readFile(String filePath) throws IOException{
        Configuration config = new Configuration();
        String file=HDFSUri+filePath;
        FileSystem fs = FileSystem.get(URI.create(file),config);
        //读取文件
        InputStream is=fs.open(new Path(file));
        //读取文件
        IOUtils.copyBytes(is, System.out, 2048, false); //复制到标准输出流
        fs.close();
    }
    
    
    public static void main(String[] args) throws Exception {
        String HDFSFile = "/hollycrm/data01/codecs/1月.zip";
        String localFile = "D:\\1月.zip";
        //连接fs
        FileSystem fs = getFileSystem();
        System.out.println(fs.getUsed());
        //创建路径
        mkdir("/zhaojy2");
        //验证是否存在
        System.out.println(existDir("/zhaojy2",false));
        //上传文件到HDFS
        copyFileToHDFS("E:\\HDFSTest.txt","/zhaojy/HDFSTest.txt");
        //下载文件到本地
        getFile("/zhaojy/HDFSTest.txt","D:\\HDFSTest.txt");
        // getFile(HDFSFile,localFile);
        //        //删除文件
        //        rmdir("/zhaojy2");
        //读取文件
        readFile("/zhaojy/HDFSTest.txt");
    }
}

 

 

jar包直接下载一个hadoop从里面的lib里面拷

下载路径:https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.1.2/hadoop-3.1.2.tar.gz

上一篇:Linux文件系统概述二


下一篇:hadoop过程中遇到的错误与解决方法