hadoop的文件操作整理java

  1 package dada;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.File;
  5 import java.io.FileInputStream;
  6 import java.io.IOException;
  7 import java.io.InputStreamReader;
  8 
  9 import org.apache.hadoop.conf.Configuration;
 10 import org.apache.hadoop.fs.FSDataInputStream;
 11 import org.apache.hadoop.fs.FSDataOutputStream;
 12 import org.apache.hadoop.fs.FileSystem;
 13 import org.apache.hadoop.fs.Path;
 14 
 15 public class HDFSToll {
 16 
 17     //路径是否存在
 18     public static boolean testExist(Configuration conf,String path) throws  IOException
 19     {
 20         FileSystem fs=FileSystem.get(conf);
 21         return fs.exists(new Path(path));
 22     }
 23     //创建目录
 24     public static boolean mkdir (Configuration conf ,String remoteDir)throws IOException
 25     {
 26     FileSystem fs=FileSystem.get(conf);
 27     Path dirPath=new Path(remoteDir);
 28     boolean result=fs.mkdirs(dirPath);
 29     fs.close();
 30     return result;
 31     }
 32     /**
 33      * 删除目录
 34      */
 35     public static boolean rmDir(Configuration conf, String remoteDir) throws IOException {
 36         FileSystem fs = FileSystem.get(conf);
 37         Path dirPath = new Path(remoteDir);
 38         /* 第二个参数表示是否递归删除所有文件 */
 39         boolean result = fs.delete(dirPath, true);
 40         fs.close();
 41         return result;
 42     }
 43     //创建文件
 44     public static void touch(Configuration conf,String remoteFilePath )throws IOException
 45     {
 46         FileSystem fs=FileSystem.get(conf);
 47         Path remotePath=new Path(remoteFilePath);
 48         FSDataOutputStream outputStream =fs.create(remotePath);
 49         outputStream.close();
 50         fs.close();
 51     }
 52     //删除文件
 53     public static boolean rm(Configuration conf,String remoteFilePath)throws IOException
 54     {
 55         FileSystem fs=FileSystem.get(conf);
 56         Path remotePath=new Path(remoteFilePath);
 57         boolean result=fs.delete(remotePath,false);
 58         fs.close();
 59         return result;
 60     }
 61     //追加文件内容  到末尾
 62     public static void appendContentToFile(Configuration conf,String content,String remoteFilePath)throws IOException
 63     {
 64         FileSystem fs=FileSystem.get(conf);
 65         Path remotePath=new Path(remoteFilePath);
 66         FSDataOutputStream out=fs.append(remotePath);
 67         out.write(content.getBytes());
 68         out.close();
 69         fs.close();
 70     }
 71    
 72     //追加文件内容到开头
 73     public static void appendContentToFile1(Configuration conf,String content,String remoteFilePath)throws IOException
 74     {
 75           String localTmpPath = "/usr/local/hadoop/enen.txt";
 76 
 77           // 移动到本地
 78           moveToLocalFile(conf, remoteFilePath, localTmpPath);
 79           // 创建一个新文件
 80           touch(conf, remoteFilePath); 
 81           // 先写入新内容
 82          appendContentToFile(conf, content, remoteFilePath);
 83           // 再写入原来内容
 84           appendContentToFile(conf, localTmpPath, remoteFilePath); 
 85 
 86           System.out.println("已追加内容到文件开头: " + remoteFilePath);
 87     }
 88     /**
 89 
 90      * 复制文件到指定路径
 91 
 92      * 若路径已存在,则进行覆盖
 93 
 94      */
 95 
 96     public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
 97 
 98         FileSystem fs = FileSystem.get(conf);
 99 
100         Path localPath = new Path(localFilePath);
101 
102         Path remotePath = new Path(remoteFilePath);
103 
104         /* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */
105 
106         fs.copyFromLocalFile(false, true, localPath, remotePath);
107 
108         fs.close();
109 
110     }
111 
112 
113 
114 
115     //将文件1写入文件2
116     public static void appendFile1ToFile2(Configuration conf,String  remoteFilePath,String remoteFilePath2)throws IOException
117     {
118         FileSystem fs=FileSystem.get(conf);
119         Path file=new Path(remoteFilePath);
120         FSDataInputStream  getIt=fs.open(file);
121         BufferedReader d=new BufferedReader(new InputStreamReader(getIt));
122         String content1=d.readLine();
123         Path remotePath=new Path(remoteFilePath2);
124         FSDataOutputStream out=fs.append(remotePath);
125         out.write(content1.getBytes());
126         d.close();
127         out.close();
128         fs.close();
129     }
130     /**
131 
132      * 追加文件内容
133 
134      */
135     public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
136 
137         FileSystem fs = FileSystem.get(conf);
138 
139         Path remotePath = new Path(remoteFilePath);
140 
141         /* 创建一个文件读入流 */
142 
143         FileInputStream in = new FileInputStream(localFilePath);
144 
145         /* 创建一个文件输出流,输出的内容将追加到文件末尾 */
146 
147         FSDataOutputStream out = fs.append(remotePath);
148 
149         /* 读写文件内容 */
150 
151         byte[] data = new byte[1024];
152 
153         int read = -1;
154 
155         while ( (read = in.read(data)) > 0 ) {
156 
157             out.write(data, 0, read);
158 
159         }
160 
161         out.close();
162 
163         in.close();
164 
165         fs.close();
166 
167     }
168     /**
169 
170      * 下载文件到本地
171 
172      * 判断本地路径是否已存在,若已存在,则自动进行重命名
173 
174      */
175 
176     public static void copyToLocal(Configuration conf, String remoteFilePath, String localFilePath) throws IOException {
177 
178         FileSystem fs = FileSystem.get(conf);
179 
180         Path remotePath = new Path(remoteFilePath);
181 
182         File f = new File(localFilePath);
183 
184         /* 如果文件名存在,自动重命名(在文件名后面加上 _0, _1 ...) */
185 
186         if (f.exists()) {
187 
188             System.out.println(localFilePath + " 文件已存在.");
189 
190             Integer i = 0;
191 
192             while (true) {
193 
194                 f = new File(localFilePath + "_" + i.toString());
195 
196                 if (!f.exists()) {
197 
198                     localFilePath = localFilePath + "_" + i.toString();
199 
200                     System.out.println("将重新命名为: " + localFilePath);
201 
202                     break;//重命名文件 
203 
204                 }
205 
206                 i++;
207 
208             }
209 
210            // System.out.println("将重新命名为: " + localFilePath);
211 
212         }
213 
214         else
215 
216             System.out.println(localFilePath + " 文件不存在.");
217 
218 
219 
220         // 下载文件到本地
221 
222        Path localPath = new Path(localFilePath);
223 
224        fs.copyToLocalFile(remotePath, localPath);
225 
226        fs.close();
227 
228     }
229     /**
230      * 移动文件到本地
231 
232      * 移动后,删除源文件
233 
234      */
235     public static void moveToLocalFile(Configuration conf, String remoteFilePath, String localFilePath) throws IOException {
236 
237         FileSystem fs = FileSystem.get(conf);
238 
239         Path remotePath = new Path(remoteFilePath);
240 
241         Path localPath = new Path(localFilePath);
242 
243         fs.moveToLocalFile(remotePath, localPath);
244 
245     }
246 }

 

上一篇:HDFS的Java API


下一篇:七、Java NIO 选择器