JAVA 读取HDFS 文件,并解析
1. 读取文件
1.1 使用程序指定验证信息
与pom.xml同目录下,放置认证及连接所需文件
static {
Configuration conf = new Configuration();
//替换成自己的路径
conf.addResource(new Path("conf/" + "hdfs-site.xml"));
conf.addResource(new Path("conf/" + "core-site.xml"));
System.setProperty("java.security.krb5.conf", "conf/" + "krb5.conf");
String javaVersion = System.getProperty(JAVA_VERSION_PROPERTY);
System.setProperty(JAVA_VERSION_PROPERTY, "1.8");
try {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("ctc@HELLO.HADOOP", "conf/" + "ctc.keytab");
URI uri = new URI("hdfs://usr/");
FILE_SYSTEM = FileSystem.get(uri, conf);
} catch (URISyntaxException | IOException e) {
throw new ExceptionInInitializerError(e);
}
System.setProperty(JAVA_VERSION_PROPERTY, javaVersion);
}
1.2 使用启动参数
启动时设置认证信息
#!/usr/bin/env bash
JVM_ARGS="-Dhadoop.property.configuration.service.preserve.user.conf=false \
-Djava.security.krb5.conf=krb5.conf \
-Dhadoop.property.hadoop.client.keytab.file=ctc.keytab \
-Dhadoop.property.hadoop.client.kerberos.principal=ctc@HELLO.HADOOP \
-Dhadoop.property.hadoop.security.authentication=kerberos \
-Dconfiguration.service=org.apache.hadoop.fs.NameServiceConfigurationService"
exec /usr/java/jdk-11-qa/bin/java ${JVM_ARGS} -jar hdfs.jar
static {
Configuration conf = new Configuration();
conf.set("fs.automatic.close", "false"); //disable fileSystem auto close
String javaVersion = System.getProperty(JAVA_VERSION_PROPERTY);
System.setProperty(JAVA_VERSION_PROPERTY, "1.8");
try {
URI uri = new URI(String.format(FILE_SYSTEM_TEMPLATE,"hdfs://usr/"));
FILE_SYSTEM = FileSystem.get(uri, conf);
} catch (URISyntaxException | IOException e) {
throw new ExceptionInInitializerError(e);
}
System.setProperty(JAVA_VERSION_PROPERTY, javaVersion);
}
第er种方式可能会受环境的影响,其他人能会改变认证文件
2. 解析文件
2.1 Parquet thrift
public static <T extends TBase<?, ?>> List<T> readParquetDirectory(String path, Class<T> clazz) throws IOException {
FileSystem fs = FILE_SYSTEM;
Path tablePath = new Path(path);
if (!fs.exists(tablePath)) {
log.error("hdfs path {} not exists", path);
return List.of();
}
List<T> lines = new ArrayList<>();
FileStatus[] files = fs.listStatus(tablePath);
for (FileStatus file : files) {
if (fs.isDirectory(file.getPath())) {
log.warn("hdfs path {} is directory!", file.getPath());
continue;
}
try {
ParquetReader<T> reader = ThriftParquetReader.<T>build(file.getPath()).withThriftClass(clazz).build();
T s;
while ((s = reader.read()) != null) {
lines.add(s);
}
} catch (RuntimeException e) {
DateTime modifyTime = new DateTime(file.getModificationTime());
log.warn("read file error path: {} bytes{}, modify time{} \n {}", file.getPath().toString(), file.getBlockSize(),
modifyTime.toString("yyyy-MM-dd HH:mm:ss"), ExceptionUtils.getStackTrace(e));
}
}
log.info("lines.size={}", lines.size());
return lines;
}
2.2 Sequence thrift
public static <T extends TBase<?, ?>> List<T> readSequenceDirectory(String path, Class<T> clazz) throws IOException {
List<T> lines = new ArrayList<>();
FileSystem fs = FILE_SYSTEM;
Configuration conf = fs.getConf();
Path tablePath = new Path(path);
if (!fs.exists(tablePath)) {
log.error("hdfs path {} not exists", path);
return lines;
}
FileStatus[] files = fs.listStatus(tablePath);
for (FileStatus file : files) {
if (fs.isDirectory(file.getPath())) {
log.warn("hdfs path {} is a directory", file.getPath());
continue;
}
try (FSDataInputStream is = fs.open(file.getPath());
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.stream(is))) {
//若其他序列化格式修改对应的读取方式
BytesWritable key = (BytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
T t = clazz.getDeclaredConstructor().newInstance();
DESERIALIZER_THREAD_LOCAL.get().deserialize(t, value.getBytes());
lines.add(t);
}
} catch (TException | InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
e.printStackTrace();
}
}
log.info("hdfs read directory:{} success, lines:{}", path, lines.size());
return lines;
}
2.3 text
public static List<String> readDirectory(String path) throws IOException {
List<String> lines = new ArrayList<>();
FileSystem fs = FILE_SYSTEM;
Path tablePath = new Path(path);
if (!fs.exists(tablePath)) {
log.error("hdfs path {} not exists!", path);
return lines;
}
FileStatus[] files = fs.listStatus(tablePath);
for (FileStatus file : files) {
if (fs.isDirectory(file.getPath())) {
log.warn("hdfs path {} is directory!", file.getPath());
continue;
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(file.getPath()), StandardCharsets.UTF_8))) {
reader.lines().forEach(lines::add);
}
}
log.info("hdfs read directory:{} success, lines:{}", path, lines.size());
return lines;
}