Java 数据以追加方式保存HDFS ,含kerberos 认证配置

package com.jast.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Calendar;


public class SaveHdfsTest {
    private static FileSystem hadoopFS = null;
    private static Configuration hdfsConf = null;
    static String hdfsUri = "hdfs://fwqml006.zh:8022";
    static String hdfsDir = "test";
    public static void main(String[] args) {
        hdfsConf = new Configuration();
        try {
            hdfsConf.set("fs.defaultFS", hdfsUri);
            hdfsConf.set("dfs.support.append", "true");//HDFS文件中追加内容
            hdfsConf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
            hdfsConf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
            hdfsConf.set("hadoop.security.authentication", "kerberos");//启用kerberos
            UserGroupInformation.setConfiguration(hdfsConf);
            UserGroupInformation.loginUserFromKeytab("hdfs@JAST.COM", "/root/jast/hdfs-kerberos/config/hdfs.keytab");//读取hdfskeytab
        } catch (Exception e) {
            System.out.println(e);
        }
        try {
            hadoopFS = FileSystem.get(hdfsConf);
            //如果hdfs的对应的目录不存在,则进行创建
            if (!hadoopFS.exists(new Path("/" + hdfsDir))) {
                hadoopFS.mkdirs(new Path("/" + hdfsDir));
            }
//            hadoopFS.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }


        while(true) {
            String fileName = "/" + hdfsDir + "/" +
                    (new SimpleDateFormat("yyyy-MM-dd").format(Calendar.getInstance().getTime())) + ".txt";
            String fileContent = System.currentTimeMillis() + "\n";
            Path dst = new Path(fileName);
            try {
                if (!hadoopFS.exists(dst)) {
                    FSDataOutputStream output = hadoopFS.create(dst);
                    output.close();
                }
                InputStream in = new ByteArrayInputStream(fileContent.getBytes("UTF-8"));
                OutputStream out = hadoopFS.append(dst);
                IOUtils.copyBytes(in, out, 4096, true);
            } catch (Exception e) {
                e.printStackTrace();
                try {
                    hadoopFS.close();
                } catch (IOException f) {
                    f.printStackTrace();
                }
            }

//            try {
//                Thread.sleep(5000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
        }





    }
}

 

上一篇:linux – 我如何消除passwd的Kerberos?


下一篇:StormUI 无法获取数据