HDFS的API使用

前提:请参考这里,然后再阅读此内容.

1.HDFS文件上传 下载 删除 改名称 文件详情查看 文件和文件夹的判断

package com.lxz.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Test;


import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HdfsClient {

    @Test
    public void testMkdirs() throws IOException,InterruptedException, URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        //2.配置在集群上运行
        configuration.set("fs.defaultFS","hdfs://hadoop1:9000");
//        FileSystem fs = FileSystem.get(configuration);
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        // 3.1 创建目录
        fs.mkdirs(new Path("//lxz/hdfs/first/tt/oo"));
        //4.关闭资源
        fs.close();
    }

    // HDFS文件上传
    @Test
    public void testCopyFromLocalFile() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        configuration.set("dfs.replication","2");
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.上传文件
        fs.copyFromLocalFile(new Path("D:/input/friends/friends.txt"), new Path("/lxz/friends/friends.txt"));
        //3.关闭资源
        fs.close();
    }

    // HDFS文件下载
    @Test
    public void testCopyToLocalFile() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.执行下载操作
//        boolean delSrc //是否将原文件删除
//        Path src // 要下载的文件路径
//        Path dst // 将文件下载到的路径
//        boolean useRawLocalFileSystem // 是否开启文件校验
        fs.copyToLocalFile(false,new Path("/lxz/friends/friends.txt"), new Path("D:/input"), true);
        // 3.关闭资源
        fs.close();
    }

    // HDFS文件删除
    @Test
    public void testDelete() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.执行删除
        fs.delete(new Path("/input/friends/friends.txt"),true);
        //3.关闭资源
        fs.close();
    }

    // HDFS文件名更改
    @Test
    public void testRename() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.修改文件名称
        fs.rename(new Path("/input/friends/friends.txt"), new Path("/input/friends/friends_two.txt"));
        //3.关闭资源
        fs.close();
    }

    // HDFS文件详情查看
    @Test
    public void testListFiles() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.获取文件详情
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"),true);

        while   (listFiles.hasNext()){
            LocatedFileStatus status = listFiles.next();
            //输出详情
            //文件名称
            System.out.println(status.getPath().getName());
            //长度
            System.out.println(status.getLen());
            //权限
            System.out.println(status.getPermission());
            //分组
            System.out.println(status.getGroup());

            //获取存储的块信息
            BlockLocation[] blockLocations = status.getBlockLocations();
            for (BlockLocation blockLocation:blockLocations){
                //获取块存储的主机节点
                String[] hosts = blockLocation.getHosts();
                for (String host:hosts){
                    System.out.println(host);
                }
            }
            System.out.println("-----运行成功标志-------");

            //3.关闭资源
            fs.close();
        }
    }

    // HDFS文件和文件夹的判断
    @Test
    public void testListStatus() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.判断是文件还是文件夹
        FileStatus[] listStatus = fs.listStatus(new Path("/"));
        for (FileStatus fileStatus:listStatus){
            //如果是文件
            if (fileStatus.isFile()){
                System.out.println("f:" + fileStatus.getPath().getName());
            }else   {
                System.out.println("d:" + fileStatus.getPath().getName());
            }
        }
        //3.关闭资源
        fs.close();
    }

}

 

2.HDFS的IO流操作

HDFS文件上传 下载 大文件分块下载

package com.lxz.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;


import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

// 通过IO流 处理HDFS API

public class hdfsIoClient {

    //HDFS文件上传
    @Test
    public void putFileToHDFS() throws IOException,InterruptedException, URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.创建输入流
        FileInputStream fileInputStream = new FileInputStream(new File("d:/friends/friends.txt"));
        //3.获取输出流
        FSDataOutputStream fsDataOutputStream = fs.create(new Path("/input/friends/friends.txt"));
        //4.流对拷
        IOUtils.copyBytes(fileInputStream,fsDataOutputStream,configuration);
        //5.关闭资源
        IOUtils.closeStream(fileInputStream);
        IOUtils.closeStream(fsDataOutputStream);
        fs.close();
    }

    //HDFS文件下载
    //需求描述:从HDFS上下载friends.txt文件到本地D:/input/friends目录下
    @Test
    public void getFileFromHDFS() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.获取输入流
        FSDataInputStream fis = fs.open(new Path("/lxz/input/friends/friends.txt"));
        //3.获取输出流
        FileOutputStream fos = new FileOutputStream(new Path("D:/input/friends/friends.txt"));
        //4.流对拷
        IOUtils.copyBytes(fis,fos,configuration);
        //5.关闭资源
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
        fs.close();
    }

    //HDFS下载大文件分块下载
    //需求描述:分块读取HDFS上的大文件,比如根目录下的hadoop-2.7.2.tar.gz

    //1.下载第一块
    @Test
    public void readFileSeek1() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.获取输入流
        FSDataInputStream fis = fs.open(new Path("./hadoop-2.7.2.tar.gz"));
        //3.获取输出流
        FileOutputStream fos = new FileOutputStream(new File("D:/input/friends/hadoop-2.7.2.tar.gz.part1"));
        //4.流对拷
        byte[] buf = new byte[1024];
        for (int i = 0; i < 1024 * 128; i++) {
            fis.read(buf);
            fos.write(buf);
        }
        //5.关闭资源
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
        fs.close();
    }
    //2.下载第二块
    @Test
    public void readFileSeek2() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration"root");
        //2.打开输入流
        FSDataInputStream fis = fs.open(new Path("./hadoop-2.7.2.tar.gz"));
        //3.定位输入数据位置
        fis.seek(1024*1024*128);
        //4.创建输出流
        FileOutputStream fos = new FileOutputStream(new File("D:/input/friends/hadoop-2.7.2.tar.gz.part2"));
        //5.流对拷
        IOUtils.copyBytes(fis,fos,configuration);
        //6.关闭资源
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
        fs.close();

//        合并文件
//        在Window命令窗口中进入到文件输出目录,然后执行如下命令,对数据进行合并
//        type hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1
//        合并完成后,将hadoop-2.7.2.tar.gz.part1重新命名为hadoop-2.7.2.tar.gz。解压发现该tar包非常完整。
    }
}

 

上一篇:c# – Lucene .NET的多短语同义词


下一篇:大数据开发之HDFS的API操作过程