样例
package pers.aishuang.flink.streaming.task;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
public class FileSystemTest {
public static void main(String[] args) throws Exception{
//Configuration conf = new Configuration();
//conf.set("fs.defaultFS","hdfs://node01:8020/");
//FileSystem fileSystem = FileSystem.get(conf);
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(),"root");
RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(new Path("/test/"), true);
Path outPath = new Path("/fileSystemData02/");
BufferedWriter writer;
FSDataOutputStream out = fileSystem.create(outPath);
FSDataInputStream in;
while (itr.hasNext()){
LocatedFileStatus next = itr.next();
Path path = next.getPath();
in = fileSystem.open(path);
BufferedReader reader = new BufferedReader(new InputStreamReader(in, "utf-8"));
writer = new BufferedWriter(new OutputStreamWriter(out, "utf-8"));
String line;
while((line = reader.readLine()) != null) {
writer.write(line);
writer.newLine();
writer.flush();
}
in.close();
}
out.close();
}
}
结果: