jdbc上传下载

package hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

public class HdfsDemo {
static {
System.setProperty("hadoop.home.dir", "D:\\Studyingimportant\\hadoop-2.9.2");
}
public static void main(String[] args) throws IOException {
//1.创建目录
boolean result = createPath();
//2.上传文件
//2.1 直接使用copyFromLocal方式的话,如果有目录,直接存储
// putFile2HDFS("C:\\Users\\Administrator\\Desktop\\大数据系-4月-专高3-(大数据方向)《离线计算》-周考.doc", "/test");
//2.2 使用输出流的方式,如果有同名的目录,直接报错
// putFile2HDFS("C:\\Users\\Administrator\\Desktop\\大数据系-4月-专高3-(大数据方向)《离线计算》-周考.doc", "/test");
//3.查看属性信息
// list("/test");
//4.下载文件
// getFileFromHDFS("/2.doc", "/test/大数据系-4月-专高3-(大数据方向)《离线计算》-周考.doc");
//5.删除文件
// delete("/test");
}

/**
* 获取文件系统(总入口)
* @return
* @throws IOException
*/
private static FileSystem getHadoopFileSystem() {
// 根据配置文件创建fs对象qu
Configuration conf = new Configuration();
//放在此处的配置,优先级最高;fs.defaultFS为必须设置的内容
conf.set("fs.defaultFS", "hdfs://192.168.157.128:9000");
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
} catch (IOException e) {
e.printStackTrace();
}
return fs;
}

/**
* 创建目录
* @return
*/
public static boolean createPath() {
boolean result = false;
FileSystem fs = getHadoopFileSystem();
Path path = new Path("/test");
try {
result = fs.mkdirs(path);
} catch (IOException e) {
e.printStackTrace();
} finally {
close(fs);
}
return result;
}


/**
* 上传文件
*/
public static void putFile2HDFS(String localPath, String hdfsPath) {
FileSystem fs = getHadoopFileSystem();
try {
fs.copyFromLocalFile(new Path(localPath), new Path(hdfsPath));
} catch (IOException e) {
e.printStackTrace();
} finally {
close(fs);
}
}

public static void putFile2HDFS2(String localPath, String hdfsPath) {
FileSystem fs = getHadoopFileSystem();
try {
//1.输入流
FileInputStream input = new FileInputStream(localPath);
//2.输出流——输出到hdfs上
FSDataOutputStream output = fs.create(new Path(hdfsPath));
//3.input---->缓冲区----->out
IOUtils.copyBytes(input, output, 4096, true);
} catch (IOException e) {
e.printStackTrace();
} finally {
close(fs);
}
}

/**
* 查看目录/文件状态信息
* @param pathName
*/
public static void list(String pathName) {
FileSystem fs = getHadoopFileSystem();
try {
//1.statusArr:当前目录中各个文件或目录的元数据信息数组
FileStatus[] statusArr = fs.listStatus(new Path(pathName));
for (FileStatus status : statusArr) {
String path = status.getPath().getName();
long blockSize = status.getBlockSize();
short replication = status.getReplication();
String isDir = status.isDirectory() ? "目录" : "文件";
System.out.println("path::" + path + ",blockSize::" + blockSize
+ ",replication::" + replication + ",isDir::" + isDir);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close(fs);
}
}

/**
* 下载文件
*/
public static void getFileFromHDFS(String localPath, String hdfsPath) {
FileSystem fs = getHadoopFileSystem();

try {
//下载文件
fs.copyToLocalFile(new Path(hdfsPath), new Path(localPath));
} catch (IOException e) {
e.printStackTrace();
} finally {
close(fs);
}
}

public static void getFileFromHDFS2(String srcPathName, String dstPathName) {
FileSystem fs = getHadoopFileSystem();
try {
FSDataInputStream in = fs.open(new Path(srcPathName));
FileOutputStream out = new FileOutputStream(dstPathName);
IOUtils.copyBytes(in, out, 4096, true);
} catch (IOException e) {
e.printStackTrace();
} finally {
close(fs);
}
}

/**
* 删除目录/文件
* @param pathName
* @return
*/
public static boolean delete(String pathName) {
boolean result = false;
FileSystem fs = getHadoopFileSystem();
Path path = new Path(pathName);

try {
//删除文件
result = fs.delete(path, true);
} catch (IOException e) {
e.printStackTrace();
} finally {
close(fs);
}
return result;
}

private static void close(FileSystem fs) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

jdbc上传下载

上一篇:几篇关于MySQL数据同步到Elasticsearch的文章---第四篇:使用go-mysql-elasticsearch同步mysql数据库信息到ElasticSearch


下一篇:isDbnull的用法