package com.jast.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Calendar;
public class SaveHdfsTest {
private static FileSystem hadoopFS = null;
private static Configuration hdfsConf = null;
static String hdfsUri = "hdfs://fwqml006.zh:8022";
static String hdfsDir = "test";
public static void main(String[] args) {
hdfsConf = new Configuration();
try {
hdfsConf.set("fs.defaultFS", hdfsUri);
hdfsConf.set("dfs.support.append", "true");//HDFS文件中追加内容
hdfsConf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
hdfsConf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
hdfsConf.set("hadoop.security.authentication", "kerberos");//启用kerberos
UserGroupInformation.setConfiguration(hdfsConf);
UserGroupInformation.loginUserFromKeytab("hdfs@JAST.COM", "/root/jast/hdfs-kerberos/config/hdfs.keytab");//读取hdfskeytab
} catch (Exception e) {
System.out.println(e);
}
try {
hadoopFS = FileSystem.get(hdfsConf);
//如果hdfs的对应的目录不存在,则进行创建
if (!hadoopFS.exists(new Path("/" + hdfsDir))) {
hadoopFS.mkdirs(new Path("/" + hdfsDir));
}
// hadoopFS.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
while(true) {
String fileName = "/" + hdfsDir + "/" +
(new SimpleDateFormat("yyyy-MM-dd").format(Calendar.getInstance().getTime())) + ".txt";
String fileContent = System.currentTimeMillis() + "\n";
Path dst = new Path(fileName);
try {
if (!hadoopFS.exists(dst)) {
FSDataOutputStream output = hadoopFS.create(dst);
output.close();
}
InputStream in = new ByteArrayInputStream(fileContent.getBytes("UTF-8"));
OutputStream out = hadoopFS.append(dst);
IOUtils.copyBytes(in, out, 4096, true);
} catch (Exception e) {
e.printStackTrace();
try {
hadoopFS.close();
} catch (IOException f) {
f.printStackTrace();
}
}
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
}
}