Netty-内存池源码二 (PooledByteBufAllocator)

Netty-内存池源码二 (PooledByteBufAllocator)

内存池核心类如下:

  • PooledByteBufAllocator 本期介绍
  • PooledUnsafeDirectByteBuf
  • PooledUnsafeDirectByteBuf
  • PoolThreadCache
  • MemoryRegionCache
  • PoolArena
  • SizeClasses
  • PoolChunk
  • LongPriorityQueue
  • LongLongHashMap
  • PoolSubpage

一、详情概要

从上一期文章 我们可以知道 内存分配的入口是 【PooledByteBufAllocator #newDirectBuffer()】。【PooledByteBufAllocator】 该类,也就是内存分配的 起始类, 其内部定义了很多 默认变量值 用于接下来的内存分配整个流程。

PooledByteBufAllocator】的结构图如下:

Netty-内存池源码二 (PooledByteBufAllocator)

从上图 可看到有三个陌生的类:

​ 1.【PoolArena】(DirectArena, HeapArena都继承于该类) : 可以把他想象成一块大内存(直接内存/ 堆内存)

​ 2.【PoolThreadCache】: 线程本地内存缓存池

​ 3.【InternalThreadLocalMap】: 目前把他当成 ThreadLocalMap 就行

在讲解源码前,请 思考一个问题 :

从图中可看到 不管是 DirectArena 还是 HeapArena,他们都有多个,且各自组成了数组 分别是directArenas 和 heapArenas。 前面说了PoolArena , 我们目前可以看成是一块大内存, 当业务来申请内存时,需要从PoolArena 这块大内存中 截取一块来使用就行。 但是 为什么要有多个PoolArena呢

解答:

首先我们假设就只有一个PoolArena, 众所周知 现在的 CPU 一般都是多核的, 此时有多个业务(多线程) 同时从Netty中 申请内存(从大内存中偏移出多个小内存)来使用,而本质上是多核CPU来操作这同一块大内存 进行读写。但是 由于 操作系统的读写内存屏障 存在, 会导致多个线程的读写并不能做到真正的并行。 因此Netty用了多个PoolArena 来减轻这种不能并行的行为,从而提升效率

二、 源码分析

1.关键成员变量

   	
	// 默认 HeapArena的个数   cpu*2
    private static final int DEFAULT_NUM_HEAP_ARENA;

	// 默认 DirectArena的个数 cpu*2
    private static final int DEFAULT_NUM_DIRECT_ARENA;

 	// 默认一页大小 PageSize  8192 => 8KB
    private static final int DEFAULT_PAGE_SIZE;
	
	// 用来计算默认Chunk的大小,在jemalloc3 中同时还表示Chunk内部完全二叉树的最大深度。
    private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk

    // 表示默认poolThreadCache中 一个smallMemoryRegionCache中的队列长度  256
    private static final int DEFAULT_SMALL_CACHE_SIZE;

	// 表示默认poolThreadCache中 一个normalMemoryRegionCache中的队列长度 64
    private static final int DEFAULT_NORMAL_CACHE_SIZE;

	//  表示最大缓存规格 32KB
    static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;

	// 表示 默认 某个线程从缓存中获取内存的最大次数限制
    private static final int DEFAULT_CACHE_TRIM_INTERVAL;

	// 表示 是否允许所有的线程使用缓存  默认是true
    private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
	
	// 默认内存缓存对齐填充为0
    private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
     
	// heapArena数组
    private final PoolArena<byte[]>[] heapArenas;

	// directArena数组
    private final PoolArena<ByteBuffer>[] directArenas;

	// PoolThreadLocalCache 线程本地缓存
    private final PoolThreadLocalCache threadCache;
	
	// chunk的大小
    private final int chunkSize;

2.构造方法

    /**
     *
     * @param preferDirect          是否申请直接内存                   默认一般都是true
     * @param nHeapArena            heapArena的个数                  假设 cpu*2
     * @param nDirectArena          directArena的个数                假设 cpu*2 
     * @param pageSize              页的大小                          默认8KB (8192)
     * @param maxOrder             chunk中完全平衡二叉树的深度           11
     * @param smallCacheSize     smallMemoryRegionCache的队列长度      256
     * @param normalCacheSize    normalMemoryRegionCache的队列长度     64
     * @param useCacheForAllThreads 是否所有的线程都使用PoolThreadCache  true
     * @param directMemoryCacheAlignment 对齐填充                      0
     */
    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
        
        // directByDefault = true
        super(preferDirect);

        // 目前理解成 threadLocal 每个线程中有自己的 PoolThreadLocalCache缓存
        threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
        
        // 赋值 smallCacheSize=256  normalCacheSize=64
        this.smallCacheSize = smallCacheSize;
        this.normalCacheSize = normalCacheSize;
		
        
        // 计算chunkSize   8KB<<11 = 16mb (16,777,216)
        chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
		

        // pageSize=8KB(8192)  
        //pageShifts表示 1 左移多少位是 8192  = 13
        int pageShifts = validateAndCalculatePageShifts(pageSize, directMemoryCacheAlignment);
        
        
        
        // 生成 heapArenas 数组
        if (nHeapArena > 0) {
            heapArenas = newArenaArray(nHeapArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
            for (int i = 0; i < heapArenas.length; i ++) {
                PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                        pageSize, pageShifts, chunkSize,
                        directMemoryCacheAlignment);
                heapArenas[i] = arena;
                metrics.add(arena);
            }
            heapArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            heapArenas = null;
            heapArenaMetrics = Collections.emptyList();
        }
		
        // netty默认情况下都会使用 直接内存,因此我们在整个Netty中关心直接内存相关就可以了,而且直接内存与堆内存逻辑并无太多差异。
        
        // 生成 directArena数组  
        if (nDirectArena > 0) {

            // 假设平台CPU 个数是8, 这里会创建 cpu(8)*2 = 16个长度的 directArenas数组。
            directArenas = newArenaArray(nDirectArena);

            // 这是个内存池图表 
            //如果想要监测 内存池详情 可使用该对象(关于内存分配逻辑不用关心Metric相关对象)
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);

            // for循环最终创建了 16个 directArena 对象,并且 将这些DirectArena对象放入到数组内
            for (int i = 0; i < directArenas.length; i ++) {
                
                // 参数1:allocator 对象
                // 参数2:pageSize 8k
                // 参数3:pageShift 13  1<<13 可推出pageSize的值
                // 参数4:chunkSize: 16mb
                // 参数5:directMemoryCacheAlignment 对齐填充 0
                // 生成 DirectArena对象
                PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
                directArenas[i] = arena;
                metrics.add(arena);
            }
            directArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            directArenas = null;
            directArenaMetrics = Collections.emptyList();
        }
        
        metric = new PooledByteBufAllocatorMetric(this);
    }

3.申请内存入口

    /**
     * 申请分配直接内存 入口
     * @param initialCapacity  业务需求内存大小
     * @param maxCapacity      内存最大限制
     * @return  ByteBuf netty中内存对象
     */
    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {

        // 若当前线程没有PoolThreadCache 则创建一份 PoolThreadCache (里面包含 Small、Normal MemroyRegionCache数组【该数组中每个元素包含一个队列】)用于缓存内存对象
      	// 若当前线程有 则直接获取
        PoolThreadCache cache = threadCache.get();

        // 拿到当前线程cache中绑定的directArena
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;

        // 这个条件正常逻辑 都会成立
        if (directArena != null) {
            
            //这是咱们的核心入口 ******  
            // 参数1: cache 当前线程相关的PoolThreadCache对象
            // 参数2:initialCapactiy 业务层需要的内存容量
            // 参数3:maxCapacity 最大内存大小
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            // 一般不会走到这里,不用看
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

三、总结

PooledByteBufAllocator】 类主要是

  1. 设置些默认变量值。 pageSize , chunkSize

  2. 并为每个申请内存的线程 创建一份 【PoolThreadCache】缓存。

  3. 根据当前平台的CPU数量,设置 【PoolArena 】 ,而最终申请内存的工作还是交给了 【PoolArena】。

上一篇:手写RPC(四) 核心模块网络协议模块编写 ---- netty服务端


下一篇:Netty粘包拆包