消息队列面试解析系列(七)- 数据压缩(下)

4 压缩分段选型



大部分压缩算法区别主要是,对数据进行编码的算法,压缩的流程和压缩包的结构大致一样。

而在压缩过程中,你最需要了解的就是如何选择合适的压缩分段。


压缩时,给定的被压缩数据它必须有确定长度,或是有头有尾的,不能是个无限数据流,若要对流数据压缩,必须把流数据划分成多帧,一帧帧分段压缩。


主要因为压缩算法在压缩前,一般都需对被压缩数据从头到尾扫描:确定如何对数据划分和编码。


  • 一般原则:
    重复次数多、占用空间大的内容,使用尽量短的编码,这样压缩率会更高。


被压缩数据长度越大,重码率更高,压缩比也越高。

比如这篇文章,可能出现几十次“压缩”,将整篇文章压缩,这词重复率几十次,但按照每个自然段来压缩,每段中这词重复率只有二三次。显然全文压缩压缩率高于分段压缩。


分段并非越大越好,超过一定长度后,再增加长度对压缩率贡献不大了。

过大的分段长度在解压时,还有更多解压浪费。

比如,一个1MB大小的压缩文件,即使你只是需要读其中很短的几个字节,也不得不把整个文件全部解压缩,造成很大的解压浪费。


所以要根据业务,选择合适压缩分段,在压缩率、压缩速度、解压浪费间找到平衡点。


确定数据划分和压缩算法后,就可压缩了,压缩过程就是用编码替换原始数据。

压缩后的压缩包是由这编码字典和用编码替换后的数据组成。


这就是数据压缩过程。解压时,先读取编码字典,然后按字典把压缩编码还原成原始数据即可。


5 Kafka 消息压缩流程


首先可以配置Kafka是否开启压缩,支持配置使用哪种压缩算法。

因为不同场景是否需要开启压缩,选择哪种压缩算法都不能一概而论。

所以Kafka将选择权交给使用者。


在开启压缩时,Kafka选择一批消息一起压缩,每一个批消息就是一个压缩分段。使用者也可以通过参数来控制每批消息的大小。


在Kafka中,生产者生成一个批消息发给服务端,在服务端中是不会拆分批消息的。那按批压缩,意味在服务端也不用对这批消息进行解压,可整批直接存储,然后整批发给消费者。最后,批消息由消费者解压。

在服务端不用解压,就不会耗费服务端CPU,同时还能获得压缩后,占用传输带宽小,占用存储空间小。

使用Kafka时,如果生产者和消费者的CPU不是特别吃紧,开启压缩后,可节省网络带宽和服务端的存储空间,提升总体的吞吐量,一般都是个不错的选择。


Kafka在生产者上,对每批消息进行压缩,批消息在服务端不解压,消费者在收到消息之后再进行解压。Kafka的压缩和解压都是在客户端完成的。



6 RocketMQ 消息压缩源码


默认压缩大于4K的消息(可配置)

消息队列面试解析系列(七)- 数据压缩(下)


压缩算法是zip,默认级别5(可配置)


消息队列面试解析系列(七)- 数据压缩(下)


也是客户端做解压缩工作,服务端只存储。


    private boolean tryToCompressMessage(final Message msg) {
        if (msg instanceof MessageBatch) {
            // 当前不支持批量消息的压缩
            return false;
        }
        byte[] body = msg.getBody();
        if (body != null) {
            if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
                try {
                    byte[] data = UtilAll.compress(body, zipCompressLevel);
                    if (data != null) {
                        msg.setBody(data);
                        return true;
                    }
                } catch (IOException e) {
                    log.error("tryToCompressMessage exception", e);
                    log.warn(msg.toString());
                }
            }
        }

        return false;
    }


DefaultLitePullConsumerImpl#pullSyncImpl会调用PullAPIWrapper#processPullResult,会为压缩消息进行解压缩。

RocketMQ的压缩机制也是Producer压缩,Broker传输,Consumer解压缩,

不同的是kaffka的压缩是基于一批批消息,对于CPU空闲较多的场景下会有更大的吞吐提


7 总结

一直以来,常见算法都是空间换时间,但在MQ和一些IO密集型应用中还有CPU计资源换网络带宽/磁盘IO。

数据压缩本质是CPU资源换存储资源,或用压缩解压的时间换取存储的空间。


在选择压缩算法的时候,需综合考虑压缩时间和压缩率两个因素,被压缩数据的内容也是影响压缩时间和压缩率的重要因素。

必要时可先用业务数据做一个压缩测试,这样有助于选择最合适的压缩算法。


另外影响压缩率的重要因素是压缩分段,需根据业务情况选择一个合适分段,在保证压缩率前提下,尽量减少解压浪费。


消息队列面试解析系列(七)- 数据压缩(下)

上一篇:在你编码之前(转)


下一篇:获取xmlhttprequest对象