一、数据压缩在hadoop中的意义
1、基本概述
压缩技术可以减少底层hdfs的读写字节数。并且能够降低在数据传输过程中占用的网络带宽资源,以及降低占用的磁盘空间。而在MapReduce中,shuffle以及merge过程都面临着巨大的IO压力。但是要注意增加了压缩,另外一方面会增加cpu的负载。所以在要权衡好是否采用压缩,以及采用的压缩算法的特性。
2、压缩应用基本原则
运算密集型的job,少用压缩。因为压缩占用cpu。
IO密集型的job,可用压缩减少数据量。
选择压缩算法时,要注意压缩比,压缩比越大,压缩以及解压时间越长。
二、MR支持的压缩编码格式
1、压缩编码
压缩格式 | hadoop是否自带 | 算法 | 文件扩展名 | 可否可切分 | 换成压缩格式后,原来的程序是否需要修改 |
---|---|---|---|---|---|
DEFAULT | 是 | DEFAULT | .deflate | 否 | 与普通文本处理一样,不需要修改 |
gzip | 是 | DEFAULT | .gz | 否 | 与普通文本处理一样,不需要修改 |
bzip2 | 是 | bzip2 | .bz2 | 是 | 与普通文本处理一样,不需要修改 |
LZO | 需另外安装 | lzo | .lzo | 是 | 需要建立索引文件,还需要指定输出格式 |
snappy | 需另外安装 | snappy | .snappy | 否 | 与普通文本处理一样,不需要修改 |
2、不同压缩算法对应的编解码器
压缩格式 | 对应编解码器 |
---|---|
DEFAULT | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
lzo | com.hadoop.compression.lzo.LzopCodec |
snappy | org.apache.hadoop.io.compress.SnappyCodec |
3、不同压缩算法的特性以及适用场景
(1)gzip
优点:
压缩率高,解压、压缩速度也必比较快。hadoop本身自带,在应用中处理gzip格式的文件就和直接处理文本一样。大部分Linux自带gzip命令,使用方便。
缺点:不支持split
适用场景:
当每个文件压缩之后在一个block左右的大小(因为无法分片),都可以考虑使用gzip将原数据压缩。例如可以将一天或者一小时的日志压缩成一个gzip文件,运行MapReduce的时候就可以并行处理多个gzip。hive,streaming,MapReduce程序处理压缩文件时,无需修改程序,就像处理文本文件一样。
(2)bzip2
优点:
支持split;高压缩比,比gzip高。hadoop自带,Linux下自带bzip2命令
缺点:压缩、解压速度慢,不支持native(java和C交互的api接口)
适用场景:
适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序(即应用程序不需要修改)的情况。
(3)lzo
优点:
压缩/解压速度比较快,压缩率合理(比gzip和bzip2小)。支持split,是hadoop中最流行的压缩格式。可以在Linux下可以通过安装lzop命令来使用
缺点:
压缩率比gzip要低一些;hadoop本身不支持,需要安装;在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。
适用场景:
一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越越明显。
(4)snappy
优点:压缩和解压速度快,合理的压缩率
缺点:不支持的split,压缩率比gzip要低;hadoop本身不支持,需要安装
适用场景:
当Mapreduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个Mapreduce作业的输出和另外一个Mapreduce作业的输入。
三、压缩的配置
1、适用范围
可以用在MapReduce的任意阶段输出、原始数据的压缩、reduce的输出等
2、hadoop压缩配置参数
参数 | 默认值 | 阶段 | 建议 |
---|---|---|---|
io.compression.codecs (在core-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress(在mapred-site.xml中配置) | false | map输出 | 这个参数设为true启用压缩 |
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 使用LZO或snappy编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) | false | reduce输出 | 这个参数设为true启用压缩 |
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress. DefaultCodec | reduce输出 | 使用gzip或者bzip2来压缩 |
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) | RECORD | reduce输出 | SequenceFile输出使用的压缩类型:NONE和BLOCK |
四、压缩应用实战例子
1、数据流的压缩和解压
package JavaCompress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.*;
public class TestCompress {
public static void main(String[] args) throws IOException, ClassNotFoundException {
//compress("G:\\Fly Away-梁静茹.mp3", "org.apache.hadoop.io.compress.GzipCodec");
deCompress("G:\\Fly Away-梁静茹.mp3.gz", "mp3");
}
//压缩
public static void compress(String filename, String method) throws IOException, ClassNotFoundException {
//创建输入流
FileInputStream fis = new FileInputStream(new File(filename));
//通过反射获取压缩类的Class 对象
Class codecClass = Class.forName(method);
//通过反射获取压缩对象
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, new Configuration());
//创建普通输出流对象
FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
//通过压缩对象创建压缩输出流,类似于将普通输出流封装成压缩输出流
CompressionOutputStream cos = codec.createOutputStream(fos);
//流拷贝
IOUtils.copyBytes(fis, cos, 1024 * 1024 * 5, false);
fis.close();
cos.close();
fos.close();
}
//解压
public static void deCompress(String filename, String decode) throws IOException {
CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
//获取文件的压缩算法类型对象,返回值可用于检查文件是否可解压
CompressionCodec codec = factory.getCodec(new Path(filename));
if (codec == null) {
System.out.println("不支持解压缩:" + filename);
return;
}
//根据压缩文件的压缩类型,返回的对象用于创建压缩输入流
CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
//创建输出流
FileOutputStream fos = new FileOutputStream(new File(filename + decode));
IOUtils.copyBytes(cis, fos, 1024 * 1024 * 5, false);
cis.close();
fos.close();
}
}
2、map输出采用压缩
用法很简单,只需在driver中给job配置以下参数即可
Configuration configuration = new Configuration();
// 开启map端输出压缩
configuration.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
3、reduce输出采用压缩
依旧是在driver中设置以下
// 设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);