hadoop: hdfs: host: hdfs://192.168.0.161:8020 path: /app-logs user: hdfs batch-size: 105267200 #1024*1024*1024 1G batch-rollover-interval: 60000 #1000*60*2 2miniutes kerberos: keytab: C:\ProgramData\MIT\Kerberos5\hdfs.headless.keytab user: hdfs-test@EMERGEN.COM kerber-conf: C:\ProgramData\MIT\Kerberos5\krb5.conf
jar包:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.2</version> </dependency>
配置conf
public static Configuration getConf(){ System.out.println(hdfsConf.toString()); Configuration conf = new Configuration(); Kerbers kerberos = hdfsConf.getKerberos(); String user=kerberos.getUser(); String keytab=kerberos.getKeytab(); String krbConf=kerberos.getKerberConf(); conf.set("fs.defaultFS", hdfsConf.getHost()); /** * hadoop使用kerberos验证 */ conf.set("hadoop.security.authentication", "kerberos"); /** * hadoop namenode节点的principal(验证实体) */ conf.set("dfs.namenode.kerberos.principal", user); /** * 访问hadoop集群的principal */ conf.set("kerberos.principal", user); /** * 访问hadoop集群的principal对应的keytab文件路径 */ conf.set("kerberos.keytab", keytab); /** * krb5.conf->kerberos客户端配置文件,这个Kerberos配置文件是必须配置的,不然找不到kerberos服务 */ System.setProperty("java.security.krb5.conf", krbConf); UserGroupInformation.setConfiguration(conf); try { //使用待验证的实体,调用loginUserFromKeytab api向hbase进行kerberos验证 UserGroupInformation.loginUserFromKeytab(user, keytab); } catch (Exception ex){ } return conf; }
创建hdfs连接
public static FileSystem getfileSystem() { //HDFS关键类FileSystem //连接的URI URI uri = URI.create(hdfsConf.getHost()); //相关配置 // Configuration conf = new Configuration(); Configuration conf=getConf(); //可以设置副本个数如:conf.set("dfs.replication","3"); //客户端名称 FileSystem fileSystem = null; try { fileSystem=fileSystem.get(conf); // fileSystem = FileSystem.get(uri, conf, hdfsConf.getUser()); } catch (IOException e) { log.error("连接HDFS失败" + e.getMessage()); } catch (Exception e) { log.error("连接HDFS失败" + e.getMessage()); } return fileSystem; }
hdfs 帮助类:
public static FileSystem getfileSystem() { //HDFS关键类FileSystem //连接的URI URI uri = URI.create(hdfsConf.getHost()); //相关配置 // Configuration conf = new Configuration(); Configuration conf=getConf(); //可以设置副本个数如:conf.set("dfs.replication","3"); //客户端名称 FileSystem fileSystem = null; try { fileSystem=fileSystem.get(conf); // fileSystem = FileSystem.get(uri, conf, hdfsConf.getUser()); } catch (IOException e) { log.error("连接HDFS失败" + e.getMessage()); } catch (Exception e) { log.error("连接HDFS失败" + e.getMessage()); } return fileSystem; }
测试用例:
void testHdfs() throws Exception { log.info("===============testHdfs======================"); Path src = new Path("D:\\eee.txt"); Path dst = new Path("/app-logs/"); FileSystem fileSystem = HdfsOperator.getfileSystem(); RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = HdfsOperator.listFiles(fileSystem, new Path("/app-logs"), false); System.out.println(locatedFileStatusRemoteIterator); while(locatedFileStatusRemoteIterator.hasNext()){ LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); System.out.println(next.getPath().toString()); } }
输出结果: