上传前先在启动HDFS,然后新建一个文件,文件名为tmp:
在web上查看是否新建(授权)成功:
打开idea:
package com.njbdqn.myhdfs.services;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
public class UploadFileToHDFS {
public static void main(String[] args) throws Exception{
Configuration cfg = new Configuration();
//cfg.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),cfg);//这里只连上了windows系统提供的Hadoop环境,与Linux的环境无关
/*System.out.println(fs);*/
//上传文件
//获得上传文件的路径(要包含文件名)
Path src = new Path("E:/mylog/log_20200102.log");
//上传的位置(HDFS路径)
Path dst = new Path("/tmp");
//下达上传命令
fs.copyFromLocalFile(src,dst);
fs.close();
}
}
运行:
查看上传的位置:
转化
JSON文件(半结构化数据)很难直接变成表格(结构化数据)。
可以用Java将JSON文件变为一种容易变为表格的文件,间接变为表格。----格式转化。
准备工作:
package com.njbdqn.myhdfs.services;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.net.URI;
public class UploadFileToHDFS {
public static void main(String[] args) throws Exception{
Configuration cfg = new Configuration();
//cfg.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),cfg);//连上windows系统提供的Hadoop环境,与Linux的环境无关
/*System.out.println(fs);*/
//文件转换
FileReader fis = new FileReader("E:/mylog/log_20200102.log");
BufferedReader bis = new BufferedReader(fis);
String line = "";
while ((line=bis.readLine()) != null) {
Info info = JSON.parseObject(line, Info.class);
System.out.println(info.getGoodid()+","+info.getMachine().getMemory());
}
bis.close();
fis.close();
}
}
运行结果(获得文件E:/mylog/log_20200102.log里的信息):
第二步:向HDFS传一个文件
新建几个class:
package com.njbdqn.myhdfs.services;
public class Info {
private Machine machine;
private String actTime;
private String actType;
private String goodid;
private String page;
private String userid;
private Browse browse;
@Override
public String toString() {
return "Info{" +
"machine=" + machine +
", actTime='" + actTime + '\'' +
", actType='" + actType + '\'' +
", goodid='" + goodid + '\'' +
", page='" + page + '\'' +
", userid='" + userid + '\'' +
", browse='" + browse + '\'' +
'}';
}
public Machine getMachine() {
return machine;
}
public void setMachine(Machine machine) {
this.machine = machine;
}
public String getActTime() {
return actTime;
}
public void setActTime(String actTime) {
this.actTime = actTime;
}
public String getActType() {
return actType;
}
public void setActType(String actType) {
this.actType = actType;
}
public String getGoodid() {
return goodid;
}
public void setGoodid(String goodid) {
this.goodid = goodid;
}
public String getPage() {
return page;
}
public void setPage(String page) {
this.page = page;
}
public String getUserid() {
return userid;
}
public void setUserid(String userid) {
this.userid = userid;
}
public Browse getBrowse() {
return browse;
}
public void setBrowse(Browse browse) {
this.browse = browse;
}
}
package com.njbdqn.myhdfs.services;
public class Browse {
private String browseType;
private String browseVersion;
public String getBrowseType() {
return browseType;
}
public void setBrowseType(String browseType) {
this.browseType = browseType;
}
public String getBrowseVersion() {
return browseVersion;
}
public void setBrowseVersion(String browseVersion) {
this.browseVersion = browseVersion;
}
}
package com.njbdqn.myhdfs.services;
public class Machine {
private String cpuType;
private String memory;
private String cpuSeed;
public String getCpuType() {
return cpuType;
}
public void setCpuType(String cpuType) {
this.cpuType = cpuType;
}
public String getMemory() {
return memory;
}
public void setMemory(String memory) {
this.memory = memory;
}
public String getCpuSeed() {
return cpuSeed;
}
public void setCpuSeed(String cpuSeed) {
this.cpuSeed = cpuSeed;
}
}
运行下面类:
package com.njbdqn.myhdfs.services;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.net.URI;
public class UploadFileToHDFS {
public static void main(String[] args) throws Exception{
Configuration cfg = new Configuration();
//cfg.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),cfg);//连上windows系统提供的Hadoop环境,与Linux的环境无关
//文件转换
FileReader fis = new FileReader("E:/mylog/log_20200102.log");
BufferedReader bis = new BufferedReader(fis);
//在HDFS上创建一个文件(不是文件夹)
FSDataOutputStream fos = fs.create(new Path("/tmp/lg_20200102.log"));
String line = "";
while ((line=bis.readLine()) != null) {
Info info = JSON.parseObject(line, Info.class);
String ctx = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n",
info.getMachine().getCpuType(),
info.getMachine().getMemory(),
info.getMachine().getCpuSeed(),
info.getActTime(),
info.getActType(),
info.getGoodid(),
info.getPage(),
info.getUserid(),
info.getBrowse().getBrowseType(),
info.getBrowse().getBrowseVersion());//String模板
fos.write(ctx.getBytes());
}
fos.flush();
fos.close();
bis.close();
fis.close();
}
}
web上得结果:
JSON格式转为下图的格式:
第三步:向HDFS传多个文件(ETR:抽取、转化、加载)
Hadoop不能用多线程完成任务(单文件不能用,多文件可以),下面写的是多线程,这是因为是多个文件
package com.njbdqn.myhdfs.services;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UploadFileToHDFS {
/* 确定文件没问题
public static void main(String[] args) {
File file = new File("e:/mylog");
String[] fst = file.list();
for (String f:fst){
System.out.println(f);
}
}
*/
public void writeFileToHDFS(String path,String fileName) {
FileSystem fs = null;
FileReader fis = null;
BufferedReader bis = null;
FSDataOutputStream fos = null;
try {
fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),new Configuration());
fis = new FileReader(path+"/"+fileName);
bis = new BufferedReader(fis);
//在HDFS上创建一个文件
fos = fs.create(new Path("/logs/"+fileName));
String line = "";
while((line=bis.readLine())!=null) {
Info info = JSON.parseObject(line, Info.class);
String ctx = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n",
info.getMachine().getCpuType(),
info.getMachine().getMemory(),
info.getMachine().getCpuSeed(),
info.getActTime(),
info.getActType(),
info.getGoodid(),
info.getPage(),
info.getUserid(),
info.getBrowse().getBrowseType(),
info.getBrowse().getBrowseVersion());//String模板
fos.write(ctx.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}finally {
try {
fos.close();
bis.close();
fis.close();
//fs.close();不一定要关闭,视具体情况而定
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(30);
final UploadFileToHDFS ufh = new UploadFileToHDFS();
String filePath = "e:/mylog";
//循环获取所有的文件
File file = new File(filePath);
String [] fs = file.list();
for (String fileName:fs) {
es.execute(new Runnable() {
@Override
public void run() {
ufh.writeFileToHDFS(filePath,fileName);
}
});
}
es.shutdown();
}
}
出现下图表示上传成功:
上图中的列名为block是128MB,但列名为size是22.11MB,浪费了很多,所以要将小文件合并为大文件。