package wl_02编程操作sequence_files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class SequenceFiles_write {
//模拟数据源;数组中一个元素表示一个文件的内容(DATA长度为5)
private static final String[] DATA = {
"The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models.",
"It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.",
"Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer",
"o delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.",
"Hadoop Common: The common utilities that support the other Hadoop modules."
};
public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
//获取文件系统对象
Configuration conf = new Configuration();
FileSystem.get(new URI("hdfs://node00:9000/kkb/2.txt"),conf,"root");
System.setProperty("HADOOP_USER_NAME","root");
//创建各种option
//将文件写入到那个文件--》option
SequenceFile.Writer.Option file = SequenceFile.Writer.file(new Path("hdfs://node00:9000/kkb/2.txt"));
因为SequenceFile每个record是键值对的,K是optipn
SequenceFile.Writer.Option keyClass = SequenceFile.Writer.keyClass(IntWritable.class);
//v也是option
SequenceFile.Writer.Option valueClass = SequenceFile.Writer.valueClass(Text.class);
//SequenceFile压缩方式:NONE | RECORD | BLOCK三选一
//方案一:RECORD、不指定压缩算法
// SequenceFile.Writer.Option compressOption = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD);
// SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption);
//方案二:BLOCK、不指定压缩算法
// SequenceFile.Writer.Option compressOption = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK);
// SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption);
//方案三:使用BLOCK、压缩算法BZip2Codec;压缩耗时间
//再加压缩算法
// BZip2Codec codec = new BZip2Codec();
// codec.setConf(conf);
// SequenceFile.Writer.Option compressAlgorithm = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, codec);
//开始写SequenceFile文件
SequenceFile.Writer.Option compressOption = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD);
SequenceFile.Writer writer = SequenceFile.createWriter(conf, file, keyClass, valueClass,compressOption);
IntWritable key = new IntWritable();
Text value = new Text();
for (int i = 0; i < 10000; i++) {
//设置key,value写入文件
key.set(i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key,value);
}
IOUtils.closeStream(writer);
}
}
命令查看SequenceFile内容
hdfs dfs -text /kkb/2.txt 或者打开网页查看
读取文件
package wl_02编程操作sequence_files;
import com.sun.corba.se.spi.ior.Writeable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
import sun.reflect.Reflection;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class SequenceFiles_read {
public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
//获取文件系统对象
Configuration conf = new Configuration();
// //要读的SequenceFile
// FileSystem.get(new URI("hdfs://node00:9000/kkb/2.txt"), conf, "root");
//指定用户
System.setProperty("HADOOP_USER_NAME", "root");
// 要读的SequenceFile文件
SequenceFile.Reader.Option file = SequenceFile.Reader.file(new Path("hdfs://node00:9000/kkb/2.txt"));
// Reader对象
SequenceFile.Reader reader = new SequenceFile.Reader(conf, file);
//创建key,value对象,用于存储key,value值
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
//获取现在要读的位置
long position = reader.getPosition();
System.out.println("position"+position);
while (reader.next(key, value)) {
//看一下当前位置是否是同步点
String syncSeen = reader.syncSeen() ? "True" : "False";
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
//移动到下一个record开头的位置
position = reader.getPosition(); // beginning of next record
}
IOUtils.closeStream(reader);
}
}