hdfs对文件的增删改查

源代码:

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.idcast</groupId>
    <artifactId>hdfs_api_demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!--java编译插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>

                </executions>
            </plugin>
        </plugins>
    </build>
</project>

java:

package cn.idcast.hdfs_api;

import com.jcraft.jsch.IO;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.kerby.util.IOUtil;
import org.apache.log4j.BasicConfigurator;
import org.junit.Test;

import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HdfsApiDemo {
    //获取FileSystem--方法1
    @Test
    public void getFileSystem1() throws IOException {
        Configuration configuration=new Configuration();
        configuration.set("fs.defaultFS","hdfs://node1:8020");
        FileSystem fileSystem = FileSystem.get(configuration);
        System.out.println(fileSystem.toString());
    }
    //获取FileSystem--方法2
    @Test
    public void getFileSystem2() throws IOException, URISyntaxException {
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"),new Configuration());
        System.out.println(fileSystem);
    }
    //获取FileSystem--方法3
    @Test
    public void getFileSystem3() throws IOException {
        Configuration configuration=new Configuration();
        configuration.set("fs.defaultFS","hdfs://node1:8020");
        FileSystem fileSystem = FileSystem.newInstance(configuration);
        System.out.println(fileSystem.toString());
    }
    //获取FileSystem--方法4
    @Test
    public void getFileSystem4() throws IOException, URISyntaxException {
        FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://node1:8020"),new Configuration());
        System.out.println(fileSystem.toString());
    }
    //遍历所有文件
    @Test
    public void listMyFiles() throws Exception, URISyntaxException {
       //1:获取FileSystem实例
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"),new Configuration(),"root");
        //2:调用方法listFiles 获取 /  目录下所有文件信息
        RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true);
       //遍历迭代器
        while(locatedFileStatusRemoteIterator.hasNext()){
            LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
            System.out.println(next.getPath().toString());
        }
        fileSystem.close();
    }
    //创建文件目录
    @Test
    public void mkdirs() throws IOException, URISyntaxException, InterruptedException {
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"),new Configuration(),"root");
        boolean mkdirs = fileSystem.mkdirs(new Path("/hello/mydir/test"));
        System.out.println(mkdirs);
        fileSystem.close();
    }
    //创建文件夹
    @Test
    public void mkdirsTest() throws IOException, URISyntaxException, InterruptedException {
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"),new Configuration(),"root");
        fileSystem.create(new Path("/hello/mydir/test/a.txt"));
       // System.out.println(mkdirs);
        //fileSystem.close();
    }
    //实现文件的下载
    @Test
    public void downloadFile() throws URISyntaxException, IOException, InterruptedException {
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"),new Configuration(),"root");
        FSDataInputStream inputStream = fileSystem.open(new Path("/hello/mydir/test/a.txt"));
        FileOutputStream outputStream = new FileOutputStream("D://a.txt");
        IOUtils.copy(inputStream,outputStream);
        IOUtils.closeQuietly(inputStream);
        IOUtils.closeQuietly(outputStream);
        fileSystem.close();
    }
    //实现文件的下载--简单方法
    @Test
    public void downloadFile2() throws URISyntaxException, IOException, InterruptedException {
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"),new Configuration(),"root");
        fileSystem.copyToLocalFile(new Path("/hello/mydir/test/a.txt"),new Path("D://a.txt"));
        fileSystem.close();
    }
    //实现文件的上传
    @Test
    public void uploadFile() throws URISyntaxException, IOException, InterruptedException {
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"),new Configuration(),"root");
        fileSystem.copyFromLocalFile(new Path("D://hdfs-site.txt"),new Path("/"));
        fileSystem.close();
    }
    //小文件的合并
    @Test
    public void mergeFile() throws URISyntaxException, IOException, InterruptedException {
        //1:获取FileSystem(分布式文件系统)
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"),new Configuration(),"root");
        //2:获取hdfs大文件的输出流
        FSDataOutputStream outputStream = fileSystem.create(new Path("/big_txt.txt"));
        //3:获取一个本地文件系统
        LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration());
        //4:获取本地文件夹下所有文件的详情
        FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("D://input"));
        //5:遍历每个文件,获取每个文件的输入流
        for (FileStatus fileStatus : fileStatuses) {
            FSDataInputStream inputStream = localFileSystem.open(fileStatus.getPath());
            //6:将小文件的数据复制到文件
            IOUtils.copy(inputStream,outputStream);
            IOUtils.closeQuietly(inputStream);
        }
        //7:关闭流
        IOUtils.closeQuietly(outputStream);
        localFileSystem.close();
        fileSystem.close();
    }
}
上一篇:Streams 过滤器


下一篇:Oracle Linux 8.0发布