Netty框架AbstractReferenceCountedByteBuf 源码分析

在Netty中,经常使用的ByteBuf的实现类如UnpooledHeapByteBuf、UnpooledDirectByteBuf、pooledHeapByteBuf、pooledDirectByteBuf等都继承自AbstractReferenceCountedByteBuf类。这个类的主要功能是对引用进行计数。ByteBuf通过对引用计数,可以知道自己被引用的次数,便于池化的ByteBuf或DirectByteBuf等进行内存回收和释放。(注意:非池化的ByteBuf每次I/O都会创建一个ByteBuf,可由JVM管理其生命周期,但UnpooledDirectByteBuf最好也手动释放;池化的ByteBuf要手动进行内存回收和释放。)

为更好地理解AbstractReferenceCountedByteBuf类的实现的功能,对源码进行分析,我使用的Netty版本为4.1.49,不同版本实现可能会有差异。

1. AbstractReferenceCountedByteBuf类的成员变量

/**
 * Abstract base class for {@link ByteBuf} implementations that count references.
 */
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
    private static final long REFCNT_FIELD_OFFSET =
            ReferenceCountUpdater.getUnsafeOffset(AbstractReferenceCountedByteBuf.class, "refCnt");
    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> AIF_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

    private static final ReferenceCountUpdater<AbstractReferenceCountedByteBuf> updater =
            new ReferenceCountUpdater<AbstractReferenceCountedByteBuf>() {
        @Override
        protected AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater() {
            return AIF_UPDATER;
        }
        @Override
        protected long unsafeOffset() {
            return REFCNT_FIELD_OFFSET;
        }
    };

    // Value might not equal "real" reference count, all access should be via the updater
    @SuppressWarnings("unused")
    private volatile int refCnt = updater.initialValue();
    // 以下省略

成员变量REFCNT_FIELD_OFFSE用于标识refCnt字段在AbstractReferenceCountedByteBuf 中的内存地址。该内存地址的获取是JDK实现强相关的。

成员变量AIF_UPDATER 是AtomicIntegerFieldUpdater类型变量,通过原子的方式对成员变量进行更新等操作,以在不使用锁的情况下实现线程安全。其将操作后面声明的refCnt变量。

成员变量updater 是抽象类ReferenceCountUpdater的具体实现,其中重写了updater()和unsafeOffset()两个虚函数,传入了上面定义的REFCNT_FIELD_OFFSE和AIF_UPDATER 成员变量。成员变量updater 用于实现具体的引用计数功能。

最后一个成员变量是volatile修饰的refCnt字段,用于跟踪对象的引用次数,使用volatile是为了解决多线程并发访问的可见性问题。其通过updater.initialValue()函数初始化为2。

    public final int initialValue() {
        return 2;
    }

 

2. 对象引用计数器

AbstractReferenceCountedByteBuf类中有retain()和release() 函数。 retain()用来增加计数器的引用,release() 函数用来减少计数器的引用。

AbstractReferenceCountedByteBuf类中retain()函数代码,其调用了updater成员变量的retain()函数。

    @Override
    public ByteBuf retain() {
        return updater.retain(this);
    }

    @Override
    public ByteBuf retain(int increment) {
        return updater.retain(this, increment);
    }

updater成员变量中retain()函数的实现代码如下,retain0()函数中通过updater()函数获取到AtomicIntegerFieldUpdater类型的变量AIF_UPDATER,调用getAndAdd()函数进行原子增加操作。getAndAdd()函数内部则是通过compareAndSet函数进行原子更新。

    public final T retain(T instance) {
        return retain0(instance, 1, 2);
    }

    public final T retain(T instance, int increment) {
        // all changes to the raw count are 2x the "real" change - overflow is OK
        int rawIncrement = checkPositive(increment, "increment") << 1;
        return retain0(instance, increment, rawIncrement);
    }

    // rawIncrement == increment << 1
    private T retain0(T instance, final int increment, final int rawIncrement) {
        int oldRef = updater().getAndAdd(instance, rawIncrement);
        if (oldRef != 2 && oldRef != 4 && (oldRef & 1) != 0) {
            throw new IllegalReferenceCountException(0, increment);
        }
        // don‘t pass 0!
        if ((oldRef <= 0 && oldRef + rawIncrement >= 0)
                || (oldRef >= 0 && oldRef + rawIncrement < oldRef)) {
            // overflow case
            updater().getAndAdd(instance, -rawIncrement);
            throw new IllegalReferenceCountException(realRefCnt(oldRef), increment);
        }
        return instance;
    }

 

AbstractReferenceCountedByteBuf类中release()函数代码如下。release()函数先调用了updater的release()函数,对计数器做减少操作。以计算数减少后的结果判断是否调用deallocate()函数进行内存回收,而具体的deallocate()函数则由具体实现类来实现。

    @Override
    public boolean release() {
        return handleRelease(updater.release(this));
    }

    @Override
    public boolean release(int decrement) {
        return handleRelease(updater.release(this, decrement));
    }

    private boolean handleRelease(boolean result) {
        if (result) {
            deallocate();
        }
        return result;
    }

    /**
     * Called once {@link #refCnt()} is equals 0.
     */
    protected abstract void deallocate();

updater所调用的relesae()函数代码如下,其从updater()函数中获取AtomicIntegerFieldUpdater类型的变量AIF_UPDATER,调用该变量的compareAndSet()函数,对计数器做减法。

    private boolean tryFinalRelease0(T instance, int expectRawCnt) {
        return updater().compareAndSet(instance, expectRawCnt, 1); // any odd number will work
    }

    private boolean nonFinalRelease0(T instance, int decrement, int rawCnt, int realCnt) {
        if (decrement < realCnt
                // all changes to the raw count are 2x the "real" change - overflow is OK
                && updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
            return false;
        }
        return retryRelease0(instance, decrement);
    }

通过上面的代码AbstractReferenceCountedByteBuf类具备了操作引用计数器的功能,并在适当的时候对内存进行回收。ByteBuf的具体实现类通过继承该类,也具备了记录自己引用次数的功能,并在合适的时候收回内存。

Netty框架AbstractReferenceCountedByteBuf 源码分析

上一篇:数据库系统概论课后SQL语句习题


下一篇:python使用pysql操作MySQL数据库