JAVA 读取HDFS 文件,并解析Parquet,Sequence,text,thrift

JAVA 读取HDFS 文件,并解析

1. 读取文件

1.1 使用程序指定验证信息

与pom.xml同目录下,放置认证及连接所需文件
JAVA 读取HDFS 文件,并解析Parquet,Sequence,text,thrift


  
    static {
        Configuration conf = new Configuration();
        //替换成自己的路径
        conf.addResource(new Path("conf/" + "hdfs-site.xml"));
        conf.addResource(new Path("conf/" + "core-site.xml"));
        System.setProperty("java.security.krb5.conf", "conf/" + "krb5.conf");
        String javaVersion = System.getProperty(JAVA_VERSION_PROPERTY);
        System.setProperty(JAVA_VERSION_PROPERTY, "1.8");
        try {
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab("ctc@HELLO.HADOOP", "conf/" + "ctc.keytab");
            URI uri = new URI("hdfs://usr/");
            FILE_SYSTEM = FileSystem.get(uri, conf);
        } catch (URISyntaxException | IOException e) {
            throw new ExceptionInInitializerError(e);
        }
        System.setProperty(JAVA_VERSION_PROPERTY, javaVersion);
    }

1.2 使用启动参数

启动时设置认证信息

#!/usr/bin/env bash

JVM_ARGS="-Dhadoop.property.configuration.service.preserve.user.conf=false \
    -Djava.security.krb5.conf=krb5.conf \
    -Dhadoop.property.hadoop.client.keytab.file=ctc.keytab \
    -Dhadoop.property.hadoop.client.kerberos.principal=ctc@HELLO.HADOOP \
    -Dhadoop.property.hadoop.security.authentication=kerberos \
    -Dconfiguration.service=org.apache.hadoop.fs.NameServiceConfigurationService"

exec /usr/java/jdk-11-qa/bin/java ${JVM_ARGS} -jar hdfs.jar

	static {
        Configuration conf = new Configuration();
        conf.set("fs.automatic.close", "false"); //disable fileSystem auto close
        String javaVersion = System.getProperty(JAVA_VERSION_PROPERTY);
        System.setProperty(JAVA_VERSION_PROPERTY, "1.8");
        try {
            URI uri = new URI(String.format(FILE_SYSTEM_TEMPLATE,"hdfs://usr/"));
            FILE_SYSTEM = FileSystem.get(uri, conf);
        } catch (URISyntaxException | IOException e) {
            throw new ExceptionInInitializerError(e);
        }
        System.setProperty(JAVA_VERSION_PROPERTY, javaVersion);
    }

第er种方式可能会受环境的影响,其他人能会改变认证文件

2. 解析文件

2.1 Parquet thrift

  public static <T extends TBase<?, ?>> List<T> readParquetDirectory(String path, Class<T> clazz) throws IOException {
        FileSystem fs = FILE_SYSTEM;
        Path tablePath = new Path(path);
        if (!fs.exists(tablePath)) {
            log.error("hdfs path {} not exists", path);
            return List.of();
        }

        List<T> lines = new ArrayList<>();
        FileStatus[] files = fs.listStatus(tablePath);
        for (FileStatus file : files) {
            if (fs.isDirectory(file.getPath())) {
                log.warn("hdfs path {} is directory!", file.getPath());
                continue;
            }

            try {
                ParquetReader<T> reader = ThriftParquetReader.<T>build(file.getPath()).withThriftClass(clazz).build();
                T s;
                while ((s = reader.read()) != null) {
                    lines.add(s);
                }
            } catch (RuntimeException e) {
                DateTime modifyTime = new DateTime(file.getModificationTime());
                log.warn("read file error path: {} bytes{}, modify time{} \n {}", file.getPath().toString(), file.getBlockSize(),
                    modifyTime.toString("yyyy-MM-dd HH:mm:ss"), ExceptionUtils.getStackTrace(e));
            }
        }
        log.info("lines.size={}", lines.size());
        return lines;
    }

2.2 Sequence thrift


 public static <T extends TBase<?, ?>> List<T> readSequenceDirectory(String path, Class<T> clazz) throws IOException {
        List<T> lines = new ArrayList<>();
        FileSystem fs = FILE_SYSTEM;
        Configuration conf = fs.getConf();
        Path tablePath = new Path(path);
        if (!fs.exists(tablePath)) {
            log.error("hdfs path {} not exists", path);
            return lines;
        }
        FileStatus[] files = fs.listStatus(tablePath);
        for (FileStatus file : files) {
            if (fs.isDirectory(file.getPath())) {
                log.warn("hdfs path {} is a directory", file.getPath());
                continue;
            }
            try (FSDataInputStream is = fs.open(file.getPath());
                 SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.stream(is))) {
                 //若其他序列化格式修改对应的读取方式
                BytesWritable key = (BytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
                BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
                while (reader.next(key, value)) {
                    T t = clazz.getDeclaredConstructor().newInstance();
                    DESERIALIZER_THREAD_LOCAL.get().deserialize(t, value.getBytes());
                    lines.add(t);
                }
            } catch (TException | InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
                e.printStackTrace();
            }
        }

        log.info("hdfs read directory:{} success, lines:{}", path, lines.size());
        return lines;
    }

2.3 text

 public static List<String> readDirectory(String path) throws IOException {
        List<String> lines = new ArrayList<>();
        FileSystem fs = FILE_SYSTEM;
        Path tablePath = new Path(path);
        if (!fs.exists(tablePath)) {
            log.error("hdfs path {} not exists!", path);
            return lines;
        }
        FileStatus[] files = fs.listStatus(tablePath);
        for (FileStatus file : files) {
            if (fs.isDirectory(file.getPath())) {
                log.warn("hdfs path {} is directory!", file.getPath());
                continue;
            }
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(file.getPath()), StandardCharsets.UTF_8))) {
                reader.lines().forEach(lines::add);
            }
        }

        log.info("hdfs read directory:{} success, lines:{}", path, lines.size());
        return lines;
    }

上一篇:地铁线路最短路径(实现)


下一篇:【js】知乎chrome控制台字符画招聘信息实现