hadoop数据压缩及涉及的相关算法和(MapReduce)代码示例演示

查看hadoop有哪些压缩算法的命令

[lqs@bdc112 hadoop-3.1.3]$ bin/hadoop checknative
2021-12-15 16:20:12,342 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native
2021-12-15 16:20:12,345 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
2021-12-15 16:20:12,348 WARN zstd.ZStandardCompressor: Error loading zstandard native libraries: java.lang.InternalError: Cannot load libzstd.so.1 (libzstd.so.1: 无法打开共享对象文件: 没有那个文件或目录)!
2021-12-15 16:20:12,353 WARN erasurecode.ErasureCodeNative: ISA-L support is not available in your platform... using builtin-java codec where applicable
Native library checking:
hadoop:  true /home/lqs/module/hadoop-3.1.3/lib/native/libhadoop.so.1.0.0
zlib:    true /lib64/libz.so.1
zstd  :  false 
snappy:  true /lib64/libsnappy.so.1
lz4:     true revision:10301
bzip2:   true /lib64/libbz2.so.1
openssl: true /lib64/libcrypto.so
ISA-L:   false libhadoop was built without ISA-L support

目录

hadoop数据压缩简介


数据压缩的好处:
可以减少磁盘IO、减少磁盘存储空间
压缩缺点:
增加CPU开销

压缩原则:
a、运行密集型的作业,尽量少用压缩
b、IO密集型的作业,尽量多用压缩

MR(MapReduce)支持的压缩编码

压缩算法基础对比简介

压缩格式 是否自带 算法 文件扩展名 切片? 转换成压缩文件后使用是否需要切片
DEFLATE 是,直接使用 DEFLATE .deflate 否,直接使用
Gzip 是,直接使用 DEFLATE .gz 否,直接使用
bzip2 是,直接使用 bzip2 .bz2 否,直接使用
LZO 否,需要安装 LZO .lzo 是,需要建索引,还需要指定输入格式
Snappy 是,直接使用 Snappy .snappy 否,直接使用

压缩性能对比

压缩算法 原始文件大小(G) 压缩文件大小(G) 压缩速度(mb/s) 解压速度(mb/s)
gzip 8.3 1.8 17.5 58
bzip2 8.3 1.1 2.4 9.5
LZO 8.3 2.9 49.3 74.6

选择压缩的条件

压缩方式的选择择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。同时还要考虑网络、IO流、磁盘、集群情况等进行选择。

几种压缩算法的对比

压缩名 优点 缺点
Gzip 压缩率比较高 不支持Split(切片),压缩或者解压的速度一般
Bzip2 压缩率高,且支持切片 压缩或者解压的速度慢
Lzo 压缩或者解压的速度比较快,支持切片 压缩率一般,想要支持切片需要额外创建索引
Snappy 压缩或者解压的速度都快 不支持切片,压缩率一般

压缩位置的选择

Map之前 Map和Reducer之间 Reducer之后
一般情况下:
无须显示指定使用的编解码方式。Hadoop自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压。

企业开发考虑因素 :
a、数据量小于块大小,重点考虑压缩和解压缩速度比较快的LZO/Snappy
b、数据量非常大,重点考虑支持切片的Bzip2和LZO
这里主要是指MapTask之后的输出。

企业开发中如何选择:
为了减少MapTask和ReduceTask之间的网络IO。重点考虑压缩和解压缩快的LZO、Snappy。
看需求:

a、如果数据永久保存,考虑压缩率比较高的Bzip2和Gzip。
b、 如果作为下一个MapReduce输入,需要考虑数据量和是否支持切。

压缩参数的配置

压缩格式 对应编码器、解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

启用压缩配置需要在hadoop中配置如下参数

参数 默认值 阶段 建议
io.compression.codecs
(在 core-site.xml 中配置)
无,这个需要在命令行输入
hadoop checknative 查看
输入压缩 Hadoop 使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress
(在 mapred-site.xml 中配置)
false mapper 输出 这个参数设为 true 启用压缩
mapreduce.map.output.compress.codec
(在 mapredsite.xml 中配置)
org.apache.hadoop.io.
compress.DefaultCodec
mapper 输出 企业多使用 LZO 或Snappy 编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress
(在mapred-site.xml 中配置)
false reducer 输出 这个参数设为 true 启用压缩
mapreduce.output.fileoutputformat.compress.codec
(在mapred-site.xml 中配置)
org.apache.hadoop.io.
compress.DefaultCodec
reducer 输出 使用标准工具或者编解码器,如 gzip 和bzip2

Map 输出端采用压缩

Map类

package com.lqs.mapreduce.zip;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:48
 */

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    Text k;

    IntWritable v;

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) {
        k = k = new Text();
        v = new IntWritable(1);
    }

    /**
     * @param key     偏移量
     * @param value   一行数据
     * @param context 上下文对象,传输,将数据传给reduce
     * @throws IOException          异常
     * @throws InterruptedException 异常
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1、获取一行,转换字符串
        String line = value.toString();

        //2、切割
        String[] words = line.split(" ");

        //3、输出,迭代写出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

Reducer类

package com.lqs.mapreduce.zip;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:52
 */

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    int sum = 0;
    /**
     * 封装int类型
     */
    IntWritable v;

    @Override
    protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) {
        v = new IntWritable();
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

        //1、累计求和
        for (IntWritable count : values) {
            sum += count.get();
        }

        //2、输出
        v.set(sum);
        context.write(key, v);

    }
}

Driver类

package com.lqs.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:55
 */

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //1、获取配置信息以及获取job对象
        Configuration configuration = new Configuration();

        // 开启 map 端输出压缩
        configuration.setBoolean("mapreduce.map.output.compress", true);
		// 设置 map 端输出压缩方式
        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        //2、关联本Driver程序的jar,或者是驱动类
        job.setJarByClass(WordCountDriver.class);

        //3、关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4、设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5、设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord"));

        //7、提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

Reducer输出端使用压缩
和上面的案例一样,这里只需要改Driver类

package com.lqs.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:55
 */

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //1、获取配置信息以及获取job对象
        Configuration configuration = new Configuration();

//        // 开启 map 端输出压缩
//        configuration.setBoolean("mapreduce.map.output.compress", true);
 设置 map 端输出压缩方式
//        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        //2、关联本Driver程序的jar,或者是驱动类
        job.setJarByClass(WordCountDriver.class);

        //3、关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4、设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5、设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord"));

        // 设置 reduce 端输出压缩开启
        FileOutputFormat.setCompressOutput(job, true);
        // 设置压缩的方式
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
        // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        //7、提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

以下是常用的使用方式,直接在Driver类里的configuration里面配置,代码如下:

package com.lqs.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:55
 */

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //1、获取配置信息以及获取job对象
        Configuration configuration = new Configuration();

        //Map
        configuration.set("mapreduce.map.output.compress","true");
        configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");

        //Reducer
        configuration.set("mapreduce.output.fileoutputformat.compress","true");
        configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

//        // 开启 map 端输出压缩
//        configuration.setBoolean("mapreduce.map.output.compress", true);
 设置 map 端输出压缩方式
//        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        //2、关联本Driver程序的jar,或者是驱动类
        job.setJarByClass(WordCountDriver.class);

        //3、关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4、设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5、设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord"));

        // 设置 reduce 端输出压缩开启
        FileOutputFormat.setCompressOutput(job, true);
        // 设置压缩的方式
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
        // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        //7、提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

上一篇:Java相当于session_start(),session_destroy()和$_SESSION [‘username’]


下一篇:java-如何获得使用Grizzly / Jersey进行记录的客户端地址?