《Netty深入剖析》之十:Netty设计模式的应用

10 设计模式

10.1 单例模式

三个原则:全局只有一个实例(private 构造方法)、延迟加载(static)、避免多线程同步创建(static、synchronized)

例子:ReadTimeoutException、MqttEncoder

ReadTimeoutException:

public final class ReadTimeoutException extends TimeoutException {

    private static final long serialVersionUID = 169287984113283421L;

    public static final ReadTimeoutException INSTANCE = new ReadTimeoutException();

    private ReadTimeoutException() { }
}

MqttEncoder也是一样的实现;
因为这是单例,所以要加@Sharable

@Sharable:https://blog.csdn.net/zhailuxu/article/details/83472632

10.2 策略模式

封装一系列可替换的算法家族,支持动态选择某一个策略

DefaultEventExecutorChooserFactory#newChooser 实现了策略模式,根据NioEventLoop数组长度来选择不同的策略

public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTowEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

10.3 装饰者模式

装饰者和被装饰者继承同一个接口(因为是要加强同一个方法)、装饰者给被装饰者动态修改行为(通过组合实现)

WrappedByteBuf、UnReleaseableByteBuf、SimpleLeakAwareByteBuf

WrappedByteBuf继承自ByteBuf,可以看作是同一个接口,内部有一个被装饰的ByteBuf,大部分方法都是委托给被装饰的ByteBuf;
WrappedByteBuf的两个子类UnReleaseableByteBuf、SimpleLeakAwareByteBuf,如重写了release方法(前者是不用释放,后者是加上了内存泄漏的检查)

class WrappedByteBuf extends ByteBuf {

    protected final ByteBuf buf;
}
final class UnreleasableByteBuf extends WrappedByteBuf {
    public boolean release() {
        return false;
    }
}
final class SimpleLeakAwareByteBuf extends WrappedByteBuf {
    @Override
    public boolean release(int decrement) {
        boolean deallocated = super.release(decrement);
        if (deallocated) {
            leak.close();
        }
        return deallocated;
    }
}

10.4 观察者模式

观察者和被观察者、观察者订阅消息,被观察者发布消息、订阅则能收到,取消订阅收不到

writeAndFlush方法就是此模式的实现:

public void write(NioSocketChannel channel, Object object) {
    ChannelFuture channelFuture = channel.writeAndFlush(object);//创建了被观察者
    channelFuture.addListener(future -> {//addListener就是添加观察者
        if (future.isSuccess()) {//writeAndFlush方法结束后,就可以通过isSuccess来判断

        } else {

        }
    });
    channelFuture.addListener(future -> {
        if (future.isSuccess()) {

        } else {

        }
    });
}

writeAndFlush深入过程中,有如下代码,newPromise就是创建被观察者

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

通过数组来保存观察者

10.5 迭代器模式

有一个迭代器接口、对容器里面各个对象进行访问

CompositeByteBuf(零拷贝)的实现:

使用代码:

public static void main(String[] args) {
    ByteBuf header = Unpooled.wrappedBuffer(new byte[]{1, 2, 3});
    ByteBuf body = Unpooled.wrappedBuffer(new byte[]{4, 5, 6});
 
    ByteBuf merge = merge(header, body);
    merge.forEachByte(value -> {
        System.out.println(value);
        return true;
    });
}
 
public static ByteBuf merge(ByteBuf header, ByteBuf body) {
    CompositeByteBuf byteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(2);
    byteBuf.addComponent(true, header);
    byteBuf.addComponent(true, body);
 
    return byteBuf;
}

这段代码是把两个ByteBuf添加到一起,forEachByte就是实现了迭代器模式。那么怎么说它是零拷贝呢?

forEachByte的实现在AbstractByteBuf里面,有这样一段代码:

@Override
public int forEachByte(ByteProcessor processor) {
    ensureAccessible();
    try {
        return forEachByteAsc0(readerIndex, writerIndex, processor);
    } catch (Exception e) {
        PlatformDependent.throwException(e);
        return -1;
    }
}

从readerIndex开始读,读到writeIndex:

private int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception {
    for (; start < end; ++start) {
        if (!processor.process(_getByte(start))) {
            return start;
        }
    }
 
    return -1;
}

查看_getByte的实现,找CompositeByteBuf的:

@Override
protected byte _getByte(int index) {
    Component c = findComponent(index);//找到索引对应的组件,即添加进来的header和body
    return c.buf.getByte(index - c.offset);
}

其实就是将要合并的ByteBuf添加到CompositeByteBuf里

别的类迭代的话,会把所有的数据都复制一遍,如ByteBufAllocator.DEFAULT.ioBuffer()

10.6 责任链模式

定义:使多个对象都有机会处理请求,从而避免请求的发送者和接受者之间的耦合关系, 将这个对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理他为止。

java里面的很多filter,是否继续向下传递,都是return true还是return 方法来实现的。

责任链模式的四个要素:

1 责任处理器接口

2 创建链,添加删除责任处理器接口

3 上下文

因为责任处理器接口在处理事件时,需要感知上下文,通过上下文来获取需要的对象

在加入

4 责任链终止机制。

channelHandler和Pipeline构成了责任链模式:

1.ChannelHandler就是责任处理器接口,ChannelInboundHandler、ChannelOuntboundHandler是它的两个增强。

2.ChannelPipeline就是创建链,里面定义了添加删除责任处理器接口的方法,如add、remove

3.ChannelHandlerContext就是上下文,channelHandler添加到链中时,会被封装为ChannelHandlerContext

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
 	//返回这条链绑定的channel
    Channel channel();
    //返回executor来执行任务,即channel对应的NioEventLoop
    EventExecutor executor();
    ...
}

4.责任链终止机制

自定义一个InBoundHandlerC

public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerC: " + msg);
        ctx.fireChannelRead(msg);
    }
}

ctx.fiteChannelRead方法就是为了把责任传递下去。如果注释掉了,消息就不会传递。
如果不重写channelRead方法,ChannelInboundHandlerAdapter#channelRead方法默认也会传递:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

最后,消息是如何一步步向下传递的呢,看AbstractChannelHandlerContext#fireChannelRead方法:

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

findContextInbound():

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

也就是,这里是不停地指向下一个ChannelHandlerContext对象实现的。

上一篇:Netty实战之使用Netty解析交通部JT808协议


下一篇:震惊!当Python遇到Excel后,将开启你的认知虫洞