10 设计模式
10.1 单例模式
三个原则:全局只有一个实例(private 构造方法)、延迟加载(static)、避免多线程同步创建(static、synchronized)
例子:ReadTimeoutException、MqttEncoder
ReadTimeoutException:
public final class ReadTimeoutException extends TimeoutException {
private static final long serialVersionUID = 169287984113283421L;
public static final ReadTimeoutException INSTANCE = new ReadTimeoutException();
private ReadTimeoutException() { }
}
MqttEncoder也是一样的实现;
因为这是单例,所以要加@Sharable
@Sharable:https://blog.csdn.net/zhailuxu/article/details/83472632
10.2 策略模式
封装一系列可替换的算法家族,支持动态选择某一个策略
DefaultEventExecutorChooserFactory#newChooser 实现了策略模式,根据NioEventLoop数组长度来选择不同的策略
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
10.3 装饰者模式
装饰者和被装饰者继承同一个接口(因为是要加强同一个方法)、装饰者给被装饰者动态修改行为(通过组合实现)
WrappedByteBuf、UnReleaseableByteBuf、SimpleLeakAwareByteBuf
WrappedByteBuf继承自ByteBuf,可以看作是同一个接口,内部有一个被装饰的ByteBuf,大部分方法都是委托给被装饰的ByteBuf;
WrappedByteBuf的两个子类UnReleaseableByteBuf、SimpleLeakAwareByteBuf,如重写了release方法(前者是不用释放,后者是加上了内存泄漏的检查)
class WrappedByteBuf extends ByteBuf {
protected final ByteBuf buf;
}
final class UnreleasableByteBuf extends WrappedByteBuf {
public boolean release() {
return false;
}
}
final class SimpleLeakAwareByteBuf extends WrappedByteBuf {
@Override
public boolean release(int decrement) {
boolean deallocated = super.release(decrement);
if (deallocated) {
leak.close();
}
return deallocated;
}
}
10.4 观察者模式
观察者和被观察者、观察者订阅消息,被观察者发布消息、订阅则能收到,取消订阅收不到
writeAndFlush方法就是此模式的实现:
public void write(NioSocketChannel channel, Object object) {
ChannelFuture channelFuture = channel.writeAndFlush(object);//创建了被观察者
channelFuture.addListener(future -> {//addListener就是添加观察者
if (future.isSuccess()) {//writeAndFlush方法结束后,就可以通过isSuccess来判断
} else {
}
});
channelFuture.addListener(future -> {
if (future.isSuccess()) {
} else {
}
});
}
writeAndFlush深入过程中,有如下代码,newPromise就是创建被观察者
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
通过数组来保存观察者
10.5 迭代器模式
有一个迭代器接口、对容器里面各个对象进行访问
CompositeByteBuf(零拷贝)的实现:
使用代码:
public static void main(String[] args) {
ByteBuf header = Unpooled.wrappedBuffer(new byte[]{1, 2, 3});
ByteBuf body = Unpooled.wrappedBuffer(new byte[]{4, 5, 6});
ByteBuf merge = merge(header, body);
merge.forEachByte(value -> {
System.out.println(value);
return true;
});
}
public static ByteBuf merge(ByteBuf header, ByteBuf body) {
CompositeByteBuf byteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(2);
byteBuf.addComponent(true, header);
byteBuf.addComponent(true, body);
return byteBuf;
}
这段代码是把两个ByteBuf添加到一起,forEachByte就是实现了迭代器模式。那么怎么说它是零拷贝呢?
forEachByte的实现在AbstractByteBuf里面,有这样一段代码:
@Override
public int forEachByte(ByteProcessor processor) {
ensureAccessible();
try {
return forEachByteAsc0(readerIndex, writerIndex, processor);
} catch (Exception e) {
PlatformDependent.throwException(e);
return -1;
}
}
从readerIndex开始读,读到writeIndex:
private int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception {
for (; start < end; ++start) {
if (!processor.process(_getByte(start))) {
return start;
}
}
return -1;
}
查看_getByte的实现,找CompositeByteBuf的:
@Override
protected byte _getByte(int index) {
Component c = findComponent(index);//找到索引对应的组件,即添加进来的header和body
return c.buf.getByte(index - c.offset);
}
其实就是将要合并的ByteBuf添加到CompositeByteBuf里
别的类迭代的话,会把所有的数据都复制一遍,如ByteBufAllocator.DEFAULT.ioBuffer()
10.6 责任链模式
定义:使多个对象都有机会处理请求,从而避免请求的发送者和接受者之间的耦合关系, 将这个对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理他为止。
java里面的很多filter,是否继续向下传递,都是return true还是return 方法来实现的。
责任链模式的四个要素:
1 责任处理器接口
2 创建链,添加删除责任处理器接口
3 上下文
因为责任处理器接口在处理事件时,需要感知上下文,通过上下文来获取需要的对象
在加入
4 责任链终止机制。
channelHandler和Pipeline构成了责任链模式:
1.ChannelHandler就是责任处理器接口,ChannelInboundHandler、ChannelOuntboundHandler是它的两个增强。
2.ChannelPipeline就是创建链,里面定义了添加删除责任处理器接口的方法,如add、remove
3.ChannelHandlerContext就是上下文,channelHandler添加到链中时,会被封装为ChannelHandlerContext
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
//返回这条链绑定的channel
Channel channel();
//返回executor来执行任务,即channel对应的NioEventLoop
EventExecutor executor();
...
}
4.责任链终止机制
自定义一个InBoundHandlerC
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
ctx.fireChannelRead(msg);
}
}
ctx.fiteChannelRead方法就是为了把责任传递下去。如果注释掉了,消息就不会传递。
如果不重写channelRead方法,ChannelInboundHandlerAdapter#channelRead方法默认也会传递:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
最后,消息是如何一步步向下传递的呢,看AbstractChannelHandlerContext#fireChannelRead方法:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
findContextInbound():
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
也就是,这里是不停地指向下一个ChannelHandlerContext对象实现的。