一、Hadoop的数据压缩
1.概述
在进行MR程序的过程中,在Mapper和Reducer端会发生大量的数据传输和磁盘IO,如果在这个过程中对数据进行压缩处理,可以有效的减少底层存储(HDFS)读写的字节数,,并且通过减少Map和Reduce阶段数据的输入输出来提升MR程序的速度,提高了网络带宽和磁盘空间的效率;
数据压缩可以有效的节省资源,它是MR程序的优化策略之一;
数据压缩会增加cpu的计算负担,但是能很大程度较少磁盘的IO。由于数据压缩占用cpu资源很小,总体还是利大于弊的。
2.数据压缩使用原则:
运算密集型的任务尽量少用压缩、IO密集型的任务多用压缩。
3.MapReduce支持的压缩编码
hadoop中的压缩格式 | 是否自带 | 文件的拓展名 | 是否可以切分 |
DEFAULT | 是 | .default | 否 |
Gzip | 是 | .gz | 否 |
bzip2 | 是 | .bz2 | 是 |
LZO | 否 | .lzo | 是 |
Snappy | 否 | .snappy | 否 |
4.编码解码器
DEFAULT | org.apache.hadoop.io.compress.DefaultCodeC |
Gzip | org.apahce.hadoop.io.compress.GzioCodeC |
bzop2 | org.apache.hadoop.io.compress.bzio2CodeC |
LZO | com.apache.hadoop.compression.lzoCodeC |
Snappy | org.apache.hadoop.io.compress.SnappyCodeC |
5.压缩性能
压缩格式 | 原始文件 | 压缩后文件 | 压缩速度 | 解压速度 |
Gzip | 8.3G | 1.8G | 17.5MB/s | 58MB/s |
bzip2 | 8.3G | 1.1G | 2.4MB/s | 9.5MB/s |
LZO | 8.3G | 2.9G | 49MB/s | 74.6MB/s |
二 、Hadoop压缩的使用
1.应用在WordCount程序中
1)在map端对数据进行压缩
在Driver类中的获取job对象后加入配置信息:
//开启map端的输入压缩
conf.setBoolean("mapreduce.map.output.compress",true); //设置压缩方法
//默认
conf.setClass("mapreduce.map.output.compress.codec",DefaultCodeC.class,CompressionCodec.class);
//Bzip2
conf.setClass("mapreduce.mapt.output.compress.codec",Bzip2Codec.class,CompressionCode.class);
//LZO
conf.setClass("mapreduce.map.output.compress.codec".LZOCodec.class,CompressionCodec.class);
注意:在map端开启压缩并不能从结果文件中看到结果的改变,只要程序运行成功就代表设置没问题!
2)在reduce端对数据进行压缩
在设置reduce输出数据类型之后加入配置信息:
//开启reduce端的输出压缩
FileOutputFormat.setCompressOutput(job,true);
//设置压缩方法
//默认
FileOutputFormat.setOutputCompressorClass(job,DefaultCodec.class);
//Bzip2
FileOutputFormat.setOutputCompressorClass(job,Bzip2Codec.class);
//Gzip
FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);
三种选择一种即可,可以看到对应的结果文件看到被压缩的结果文件。
2.自定义压缩方法
/**
* @author: PrincessHug
* @date: 2019/4/8, 9:49
* @Blog: https://www.cnblogs.com/HelloBigTable/
*/
public class TestCompress {
public static void main(String[] args) throws IOException, ClassNotFoundException {
Compress("G:\\weblog.log","org.apache.hadoop.io.compress.BZip2Codec");
} //自定义压缩方法
private static void Compress(String fileName,String method) throws IOException, ClassNotFoundException {
//获取输入流
FileInputStream fis = new FileInputStream(new File(fileName)); //通过反射获取压缩方法并初始化
Class cName = Class.forName(method);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(cName, new Configuration()); //定义输出流
FileOutputStream fos = new FileOutputStream(new File(fileName + codec.getDefaultExtension())); //创建压缩输出流
CompressionOutputStream cos = codec.createOutputStream(fos); //流的拷贝
IOUtils.copyBytes(fis,cos,2*1024*1024,false); //关闭资源
fis.close();
cos.close();
fos.close();
}
}