FTP文件上传到HDFS上

在做测试数据时,往往会有ftp数据上传到hdfs的需求,一般需要手动操作,这样做太费事,于是有了下边代码实现的方式:

ftp数据上传到hdfs函数:

import java.io.InputStream;

import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; /**
* Created by Administrator on 11/10/2017.
*/
public class FtpUtil {
/**
* loadFromFtpToHdfs:将数据从ftp上传到hdfs上. <br/>
*
* @param ip
* @param username
* @param password
* @param filePath
* @param outputPath
* @param conf
* @return
* @author qiyongkang
* @since JDK 1.8
*/
public static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath, String outputPath, Configuration conf) {
FTPClient ftp = new FTPClient();
InputStream inputStream = null;
FSDataOutputStream outputStream = null;
boolean flag = true;
try {
ftp.connect(ip);
ftp.login(username, password);
ftp.setFileType(FTP.BINARY_FILE_TYPE);
ftp.setControlEncoding("UTF-8");
int reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
}
FTPFile[] files = ftp.listFiles(filePath);
FileSystem hdfs = FileSystem.get(conf);
for (FTPFile file : files) {
if (!(file.getName().equals(".") || file.getName().equals(".."))) {
inputStream = ftp.retrieveFileStream(filePath + file.getName());
outputStream = hdfs.create(new Path(outputPath + file.getName()));
IOUtils.copyBytes(inputStream, outputStream, conf, false);
if (inputStream != null) {
inputStream.close();
ftp.completePendingCommand();
}
}
}
ftp.disconnect();
} catch (Exception e) {
flag = false;
e.printStackTrace();
}
return flag;
}
}

main调用函数:

import org.apache.hadoop.conf.Configuration

/**
* Created by Administrator on 11/10/2017.
*/
object FtpDownToHdfsMain {
def main(args: Array[String]): Unit = {
val conf = new Configuration()
FtpUtil.loadFromFtpToHdfs("192.168.1.23", "test", "abc123", "/www/input/", "/user/jr/dt/fblib/", conf)
}
}

使用yarn jar提交:

yarn jar myapp.jar
上一篇:Linux常用命令之wget


下一篇:关于HTTPS的简要内容