Java代码操作HDFS

package com.hy.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException; public class HDFSCommand { public static final Logger log = LoggerFactory.getLogger(HDFSCommand.class); public static void main(String[] args) throws Exception {
String hdfsURI = "hdfs://10.1.23.240:9000";
String srcPath = "D:" + File.separator + "readme.txt";
String descPath = "/xhy";
String data = "haohaohaohaohao\r\n善字\r\n善生\r\n善行\r\n守善\r\n愿善";
Configuration conf = new Configuration();
copyFromLocalFile(hdfsURI, srcPath, descPath, conf);
uploadFile(hdfsURI, data, descPath, conf);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = listFile(hdfsURI, descPath, conf, true);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
System.out.println("listFile:" + next.toString());
}
FileStatus[] fileStatuses = listFileAndFolder(hdfsURI, descPath, conf);
for (FileStatus f : fileStatuses) {
System.out.println("listFileAndFolder:" + f.toString());
} } /**
* 本地指定路径文件上传到hdfs
*
* @param hdfsURI
* @param srcPath
* @param descPath
* @param conf
*/
public static void copyFromLocalFile(String hdfsURI, String srcPath, String descPath, Configuration conf) throws URISyntaxException, IOException {
log.info(">> copyFromLocalFile, srcPath is {}, descPath is {}", srcPath, descPath);
FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
fs.copyFromLocalFile(new Path(srcPath), new Path(descPath));
log.info("<< copyFromLocalFile success");
fs.close();
/*
* 底层是通过
* fs.open(new Path(srcPath), 4096);
* fs.create(new Path(descPath));
* IOUtils.copyBytes(in, out, conf, true);
*/
} /**
* 将数据写入到hdfs
*
* @param hdfsURI
* @param data
* @param descPath
* @param conf
*/
public static void uploadFile(String hdfsURI, String data, String descPath, Configuration conf) throws Exception {
log.info(">> uploadFile, descPath is {}, data is {}", descPath, data);
FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
/*FSDataOutputStream fsOutputStream = fs.create(new Path(descPath), new Progressable() {
@Override
public void progress() {
log.info("<< 写入hdfs成功,文件路径为:{}", descPath);
}
});*/
FSDataOutputStream fsOutputStream = fs.create(new Path(descPath),
() -> log.info("<< 写入hdfs成功,文件路径为:{}", descPath));
fsOutputStream.write(data.getBytes(), 0, data.getBytes().length);
/*
* 以下几种方式会出现中文乱码
* fsOutputStream.writeBytes(data);
* fsOutputStream.writeUTF(data);
* fsOutputStream.writeChars(data);
*/
fsOutputStream.close();
fs.close();
} /**
* 查找hdfs指定路径下的文件
*
* @param hdfsURI
* @param path
* @param conf
* @param recursive 是否递归查找
* @throws Exception
*/
public static RemoteIterator<LocatedFileStatus> listFile(String hdfsURI, String path, Configuration conf, boolean recursive) throws Exception {
log.info(">> listFile, path is {}, recursive is {}", path, recursive);
FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
RemoteIterator<LocatedFileStatus> result = fs.listFiles(new Path(path), recursive);
log.info("<< listFile, result is {}", result);
return result;
} /**
* 查找hdfs指定路径下的文件和文件夹
*
* @param hdfsURI
* @param path
* @param conf
*/
public static FileStatus[] listFileAndFolder(String hdfsURI, String path, Configuration conf) throws Exception {
log.info(">> listFileAndFolder, path is {}", path);
FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
FileStatus[] result = fs.listStatus(new Path(path));
log.info("<< listFileAndFolder, result is {}", result.toString());
return result;
// 方法二
} /**
* 创建文件夹
*
* @param hdfsURI
* @param path
* @param conf
* @throws Exception
*/
public static void mkDir(String hdfsURI, String path, Configuration conf) throws Exception {
log.info(">> mkDir, path is {}", path);
FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
boolean result = fs.mkdirs(new Path(path));
if (result) {
log.info("<< mkDir {} success", path);
} else {
log.error("<< mkDir {} error", path);
}
} /**
* 删除指定路径
*
* @param hdfsURI
* @param path
* @param conf
* @throws IOException
*/
public static void delete(String hdfsURI, String path, Configuration conf) throws IOException {
log.info(">> delete, path is {}", path);
conf.set("fs.defaultFS", hdfsURI);
FileSystem fs = FileSystem.get(conf);
if (!fs.exists(new Path(path))) {
log.info("<< delete {} error, path no exists", path);
return;
}
boolean result = fs.delete(new Path(path), true);
if (result) {
log.info("<< delete {} success", path);
} else {
log.error("<< delete {} error", path);
}
} /**
* 从hdfs上面下载
*
* @param hdfsURI
* @param srcPath
* @param descPath
* @param conf
* @throws Exception
*/
public static void downloadFile(String hdfsURI, String srcPath, String descPath, Configuration conf) throws Exception {
log.info(">> downloadFile, srcPath is {}, descPath is {}", srcPath, descPath);
FileSystem fs = FileSystem.get(new URI(hdfsURI), conf);
FSDataInputStream in = fs.open(new Path(srcPath));
OutputStream out = new FileOutputStream(new File(descPath));
IOUtils.copyBytes(in, out, conf);
} public static void catFile(String hdfsURI, String path, Configuration conf) throws Exception {
log.info(">> catFile, path is {}", path);
FileSystem fs = FileSystem.get(URI.create(hdfsURI), conf);
FSDataInputStream in = fs.open(new Path(path));
try {
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
fs.close();
}
} }
上一篇:位(bit)与字节(byte)


下一篇:Redis .Net 基本类型使用之南