实现对HDFS增删改查CRUD等操作
1 查找
列出某个目录下的文件名称,hdfs命令如下所示:
hdfs dfs –ls/usr/app
java代码片段:
public void list(String srcPath) { Configuration conf = new Configuration(); LOG.info("[Defaultfs] :" +conf.get("fs.default.name")); // conf.set("hadoop.job.ugi","app,app"); //It is not necessary for the default user. FileSystem fs; try { fs= FileSystem.get(conf); RemoteIterator<LocatedFileStatus>rmIterator = fs.listLocatedStatus(new Path(srcPath)); while (rmIterator.hasNext()) { Path path = rmIterator.next().getPath(); if(fs.isDirectory(path)){ LOG.info("-----------DirectoryName: "+path.getName()); } else if(fs.isFile(path)){ LOG.info("-----------FileName: "+path.getName()); } } }catch (IOException e) { LOG.error("list fileSysetm object stream.:" , e); new RuntimeException(e); } }
输出结果:
2014-03-11 22:38:15,329 INFO (com.hdfs.client.SyncDFS:48) ------------File Name: README.txt
2014-03-11 22:38:15,331 INFO (com.hdfs.client.SyncDFS:45) ------------Directory Name: blog_blogpost
2014-03-11 22:38:15,333 INFO (com.hdfs.client.SyncDFS:45) ------------Directory Name: test
读取文件中的内容,hdfs命令如下:
hdfs dfs –cat /input
java 代码:
public void readFile(String file){ Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); Pathpath = new Path(file); if(!fs.exists(path)){ LOG.warn("file‘"+ file+"‘ doesn‘t exist!"); return ; } FSDataInputStreamin = fs.open(path); Stringfilename = file.substring(file.lastIndexOf(‘/‘) + 1, file.length()); OutputStreamout = new BufferedOutputStream(new FileOutputStream( new File(filename))); byte[] b = new byte[1024]; int numBytes = 0; while ((numBytes = in.read(b)) > 0) { out.write(b,0, numBytes); } in.close(); out.close(); fs.close(); }catch (IOException e) { LOG.error("ifExists fs Exception caught! :" , e); new RuntimeException(e); } }
获取文件的修改时间,java代码:
/** * Gets the information about the file modifiedtime. * @param source * @throws IOException */ public void getModificationTime(String source) throws IOException{ Configurationconf = new Configuration(); FileSystemfs = FileSystem.get(conf); PathsrcPath = new Path(source); // Check if the file alreadyexists if (!(fs.exists(srcPath))) { System.out.println("No such destination " + srcPath); return; } // Get the filename out of thefile path Stringfilename = source.substring(source.lastIndexOf(‘/‘) + 1, source.length()); FileStatusfileStatus = fs.getFileStatus(srcPath); long modificationTime =fileStatus.getModificationTime(); LOG.info("modified datetime: " + System.out.format("File %s; Modification time : %0.2f%n",filename,modificationTime)); }
获取文件块定位信息,java代码:
/** * Gets the file block location info * @param source * @throws IOException */ public void getBlockLocations(String source) throws IOException{ Configurationconf = new Configuration(); FileSystemfs = FileSystem.get(conf); PathsrcPath = new Path(source); // Check if the file alreadyexists if (!(ifExists(source))) { System.out.println("No such destination " + srcPath); return; } // Get the filename out of thefile path Stringfilename = source.substring(source.lastIndexOf(‘/‘) + 1, source.length()); FileStatusfileStatus = fs.getFileStatus(srcPath); BlockLocation[]blkLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); int blkCount = blkLocations.length; System.out.println("File :" + filename + "stored at:"); for (int i=0; i < blkCount; i++) { String[]hosts = blkLocations[i].getHosts(); LOG.info("host ip:" +System.out.format("Host %d: %s %n", i, hosts)); } }
获取Hadoop集群中data node的DNS主机名,java代码:
public void getHostnames () throwsIOException{ Configurationconfig = new Configuration(); FileSystemfs = FileSystem.get(config); DistributedFileSystemhdfs = (DistributedFileSystem) fs; DatanodeInfo[]dataNodeStats = hdfs.getDataNodeStats(); String[]names = new String[dataNodeStats.length]; for (int i = 0; i < dataNodeStats.length; i++) { names[i]= dataNodeStats[i].getHostName(); LOG.info("datenode hostname:"+(dataNodeStats[i].getHostName())); } }
2 创建
创建一个目录,指定具体的文件路径。hdfs命令如下:
hdfs dfs –mkdir/usr/app/tmp
java代码:
public void mkdir(String dir){ Configurationconf = new Configuration(); FileSystemfs = null; try { fs= FileSystem.get(conf); Pathpath = new Path(dir); if(!fs.exists(path)){ fs.mkdirs(path); LOG.debug("create directory ‘"+dir+"‘ successfully!"); }else{ LOG.debug("directory ‘"+dir+"‘ exits!"); } }catch (IOException e) { LOG.error("FileSystem get configuration with anerror"); e.printStackTrace(); }finally{ if(fs!= null){ try { fs.close(); }catch (IOException e) { LOG.error("close fs object stream. :" , e); new RuntimeException(e); } } } }
将本地文件上传到hdfs上去,java代码如下:
public void copyFromLocal (String source, String dest) { Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); PathsrcPath = new Path(source); PathdstPath = new Path(dest); // Check if the file alreadyexists if (!(fs.exists(dstPath))) { LOG.warn("dstPathpath doesn‘t exist" ); LOG.error("No such destination " + dstPath); return; } // Get the filename out of thefile path Stringfilename = source.substring(source.lastIndexOf(‘/‘) + 1, source.length()); try{ //if the file exists in thedestination path, it will throw exception. // fs.copyFromLocalFile(srcPath,dstPath); //remove and overwrite files withthe method //copyFromLocalFile(booleandelSrc, boolean overwrite, Path src, Path dst) fs.copyFromLocalFile(false, true, srcPath, dstPath); LOG.info("File " + filename + "copied to " + dest); }catch(Exception e){ LOG.error("copyFromLocalFile exception caught!:" , e); new RuntimeException(e); }finally{ fs.close(); } }catch (IOException e1) { LOG.error("copyFromLocal IOException objectstream. :" ,e1); new RuntimeException(e1); } }
添加一个文件到指定的目录下,java代码如下:
public void addFile(String source, String dest) { // Conf object will readthe HDFS configuration parameters Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); // Get the filename out of thefile path Stringfilename = source.substring(source.lastIndexOf(‘/‘) + 1, source.length()); // Create the destination pathincluding the filename. if (dest.charAt(dest.length() - 1) != ‘/‘) { dest= dest + "/" + filename; }else { dest= dest + filename; } // Check if the file alreadyexists Pathpath = new Path(dest); if (fs.exists(path)) { LOG.error("File " + dest + " already exists"); return; } // Create a new file and writedata to it. FSDataOutputStreamout = fs.create(path); InputStreamin = new BufferedInputStream(new FileInputStream( new File(source))); byte[] b = new byte[1024]; int numBytes = 0; //In this way read and write datato destination file. while ((numBytes = in.read(b)) > 0) { out.write(b,0, numBytes); } in.close(); out.close(); fs.close(); }catch (IOException e) { LOG.error("addFile Exception caught! :" , e); new RuntimeException(e); } }
3 修改
重新命名hdfs中的文件名称,java代码如下:
public void renameFile (String fromthis, String tothis){ Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); PathfromPath = new Path(fromthis); PathtoPath = new Path(tothis); if (!(fs.exists(fromPath))) { LOG.info("No such destination " + fromPath); return; } if (fs.exists(toPath)) { LOG.info("Already exists! " + toPath); return; } try{ boolean isRenamed = fs.rename(fromPath,toPath); //renames file name indeed. if(isRenamed){ LOG.info("Renamed from " + fromthis + " to " + tothis); } }catch(Exception e){ LOG.error("renameFile Exception caught! :" , e); new RuntimeException(e); }finally{ fs.close(); } }catch (IOException e1) { LOG.error("fs Exception caught! :" , e1); new RuntimeException(e1); } }
4 删除
在hdfs上,删除指定的一个文件。Java代码:
public void deleteFile(String file) { Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); Pathpath = new Path(file); if (!fs.exists(path)) { LOG.info("File " + file + " does not exists"); return; } /* * recursively delete the file(s) if it is adirectory. * If you want to mark the path that will bedeleted as * a result of closing the FileSystem. * deleteOnExit(Path f) */ fs.delete(new Path(file), true); fs.close(); }catch (IOException e) { LOG.error("deleteFile Exception caught! :" , e); new RuntimeException(e); } }
Appendix 完整代码
import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; importorg.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; public class SyncDFS { private static final Log LOG = LogFactory.getLog(SyncDFS.class); /** * Reads the directory name(s) and file name(s)from the specified parameter "srcPath" * @param srcPath */ public void list(String srcPath) { Configuration conf = new Configuration(); LOG.info("[Defaultfs] :" +conf.get("fs.default.name")); // conf.set("hadoop.job.ugi","app,app"); //It is not necessary for the default user. FileSystem fs; try { fs= FileSystem.get(conf); RemoteIterator<LocatedFileStatus>rmIterator = fs.listLocatedStatus(new Path(srcPath)); while (rmIterator.hasNext()) { Path path = rmIterator.next().getPath(); if(fs.isDirectory(path)){ LOG.info("-----------DirectoryName: "+path.getName()); } else if(fs.isFile(path)){ LOG.info("-----------FileName: "+path.getName()); } } }catch (IOException e) { LOG.error("list fileSysetm object stream.:" , e); new RuntimeException(e); } } /** * Makes the specified directory if it doesn‘texist. * @param dir */ public void mkdir(String dir){ Configurationconf = new Configuration(); FileSystemfs = null; try { fs= FileSystem.get(conf); Pathpath = new Path(dir); if(!fs.exists(path)){ fs.mkdirs(path); LOG.debug("create directory ‘"+dir+"‘ successfully!"); }else{ LOG.debug("directory ‘"+dir+"‘ exits!"); } }catch (IOException e) { LOG.error("FileSystem get configuration with anerror"); e.printStackTrace(); }finally{ if(fs!= null){ try { fs.close(); }catch (IOException e) { LOG.error("close fs object stream. :" , e); new RuntimeException(e); } } } } /** * Reads the file content in console. * @param file */ public void readFile(String file){ Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); Pathpath = new Path(file); if(!fs.exists(path)){ LOG.warn("file‘"+ file+"‘ doesn‘t exist!"); return ; } FSDataInputStreamin = fs.open(path); Stringfilename = file.substring(file.lastIndexOf(‘/‘) + 1, file.length()); OutputStreamout = new BufferedOutputStream(new FileOutputStream( new File(filename))); byte[] b = new byte[1024]; int numBytes = 0; while ((numBytes = in.read(b)) > 0) { out.write(b,0, numBytes); } in.close(); out.close(); fs.close(); }catch (IOException e) { LOG.error("ifExists fs Exception caught! :" , e); new RuntimeException(e); } } public boolean ifExists(String source){ if(source == null || source.length() ==0){ return false; } Configurationconf = new Configuration(); FileSystemfs = null; try { fs= FileSystem.get(conf); LOG.debug("judge file ‘"+source + "‘"); return fs.exists(new Path(source)); }catch (IOException e) { LOG.error("ifExists fs Exception caught! :" , e); new RuntimeException(e); return false; }finally{ if(fs != null){ try { fs.close(); }catch (IOException e) { LOG.error("fs.close Exception caught! :" , e); new RuntimeException(e); } } } } /** * Recursively copies the source pathdirectories or files to the destination path of DFS. * It is the same functionality as thefollowing comand: * hadoopfs -copyFromLocal <local fs><hadoop fs> * @param source * @param dest */ public void copyFromLocal (String source, String dest) { Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); PathsrcPath = new Path(source); PathdstPath = new Path(dest); // Check if the file alreadyexists if (!(fs.exists(dstPath))) { LOG.warn("dstPathpath doesn‘t exist" ); LOG.error("No such destination " + dstPath); return; } // Get the filename out of thefile path Stringfilename = source.substring(source.lastIndexOf(‘/‘) + 1, source.length()); try{ //if the file exists in thedestination path, it will throw exception. // fs.copyFromLocalFile(srcPath,dstPath); //remove and overwrite files withthe method //copyFromLocalFile(booleandelSrc, boolean overwrite, Path src, Path dst) fs.copyFromLocalFile(false, true, srcPath, dstPath); LOG.info("File " + filename + "copied to " + dest); }catch(Exception e){ LOG.error("copyFromLocalFile exception caught!:" , e); new RuntimeException(e); }finally{ fs.close(); } }catch (IOException e1) { LOG.error("copyFromLocal IOException objectstream. :" ,e1); new RuntimeException(e1); } } public void renameFile (String fromthis, String tothis){ Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); PathfromPath = new Path(fromthis); PathtoPath = new Path(tothis); if (!(fs.exists(fromPath))) { LOG.info("No such destination " + fromPath); return; } if (fs.exists(toPath)) { LOG.info("Already exists! " + toPath); return; } try{ boolean isRenamed = fs.rename(fromPath,toPath); //renames file name indeed. if(isRenamed){ LOG.info("Renamed from " + fromthis + " to " + tothis); } }catch(Exception e){ LOG.error("renameFile Exception caught! :" , e); new RuntimeException(e); }finally{ fs.close(); } }catch (IOException e1) { LOG.error("fs Exception caught! :" , e1); new RuntimeException(e1); } } /** * Uploads or adds a file to HDFS * @param source * @param dest */ public void addFile(String source, String dest) { // Conf object will readthe HDFS configuration parameters Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); // Get the filename out of thefile path Stringfilename = source.substring(source.lastIndexOf(‘/‘) + 1, source.length()); // Create the destination pathincluding the filename. if (dest.charAt(dest.length() - 1) != ‘/‘) { dest= dest + "/" + filename; }else { dest= dest + filename; } // Check if the file alreadyexists Pathpath = new Path(dest); if (fs.exists(path)) { LOG.error("File " + dest + " already exists"); return; } // Create a new file and writedata to it. FSDataOutputStreamout = fs.create(path); InputStreamin = new BufferedInputStream(new FileInputStream( new File(source))); byte[] b = new byte[1024]; int numBytes = 0; //In this way read and write datato destination file. while ((numBytes = in.read(b)) > 0) { out.write(b,0, numBytes); } in.close(); out.close(); fs.close(); }catch (IOException e) { LOG.error("addFile Exception caught! :" , e); new RuntimeException(e); } } /** *Deletes the files if it is a directory. * @param file */ public void deleteFile(String file) { Configurationconf = new Configuration(); FileSystemfs; try { fs= FileSystem.get(conf); Pathpath = new Path(file); if (!fs.exists(path)) { LOG.info("File " + file + " does not exists"); return; } /* * recursively delete the file(s) if it is adirectory. * If you want to mark the path that will bedeleted as * a result of closing the FileSystem. * deleteOnExit(Path f) */ fs.delete(new Path(file), true); fs.close(); }catch (IOException e) { LOG.error("deleteFile Exception caught! :" , e); new RuntimeException(e); } } /** * Gets the information about the file modifiedtime. * @param source * @throws IOException */ public void getModificationTime(String source) throws IOException{ Configurationconf = new Configuration(); FileSystemfs = FileSystem.get(conf); PathsrcPath = new Path(source); // Check if the file alreadyexists if (!(fs.exists(srcPath))) { System.out.println("No such destination " + srcPath); return; } // Get the filename out of thefile path Stringfilename = source.substring(source.lastIndexOf(‘/‘) + 1, source.length()); FileStatusfileStatus = fs.getFileStatus(srcPath); long modificationTime =fileStatus.getModificationTime(); LOG.info("modified datetime: " + System.out.format("File %s; Modification time : %0.2f%n",filename,modificationTime)); } /** * Gets the file block location info * @param source * @throws IOException */ public void getBlockLocations(String source) throws IOException{ Configurationconf = new Configuration(); FileSystemfs = FileSystem.get(conf); PathsrcPath = new Path(source); // Check if the file alreadyexists if (!(ifExists(source))) { System.out.println("No such destination " + srcPath); return; } // Get the filename out of thefile path Stringfilename = source.substring(source.lastIndexOf(‘/‘) + 1, source.length()); FileStatusfileStatus = fs.getFileStatus(srcPath); BlockLocation[]blkLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); int blkCount = blkLocations.length; System.out.println("File :" + filename + "stored at:"); for (int i=0; i < blkCount; i++) { String[]hosts = blkLocations[i].getHosts(); LOG.info("host ip:" +System.out.format("Host %d: %s %n", i, hosts)); } } public void getHostnames () throws IOException{ Configurationconfig = new Configuration(); FileSystemfs = FileSystem.get(config); DistributedFileSystemhdfs = (DistributedFileSystem) fs; DatanodeInfo[]dataNodeStats = hdfs.getDataNodeStats(); String[]names = new String[dataNodeStats.length]; for (int i = 0; i < dataNodeStats.length; i++) { names[i]= dataNodeStats[i].getHostName(); LOG.info("datenode hostname:"+(dataNodeStats[i].getHostName())); } } /** * @param args */ public static void main(String[] args) { SyncDFSdfs = new SyncDFS(); dfs.list("/user/app"); dfs.mkdir("/user/app"); // dfs.readFile("/user/app/README.txt"); LOG.info("--------------" + dfs.ifExists("/user/warehouse/hbase.db/u_data/u.data")); //false LOG.info("--------------" + dfs.ifExists("/user/app/README.txt")); //true //copied the local file(s) to thedfs. // dfs.copyFromLocal("/opt/test","/user/app"); //delete the file(s) from the dfs // dfs.deleteFile("/user/app/test"); //rename diretory in dfs // dfs.renameFile("/user/app/test","/user/app/log"); //rename file in dfs // dfs.renameFile("/user/app/log/derby.log","/user/app/log/derby_info.log"); } }