DolphinScheduler源码解析-NettyServerHandler
文章目录
类定义
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter
注解说明了这个handler可以被多个channel共享,然后该类又继承了ChannelInboundHandlerAdapter,他就可以处理各种网络事件,如register,unregistred等事件。
类属性
private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
/**
* netty remote server
*/
private final NettyRemotingServer nettyRemotingServer;
/**
* server processors queue
*/
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap<>();
可以看到它持有一个NettyRemotingServer类的引用。 它还定义了一个并发的map结构,用来存储命令类型和处理类以及executor。
我们看它的构造方法, 给NettyRemotingServer类的引用赋值。
public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
this.nettyRemotingServer = nettyRemotingServer;
}
再看它的方法
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if (executorRef == null) {
executorRef = nettyRemotingServer.getDefaultExecutor();
}
this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
}
可以看出它的registerProcessor方法就是保存了命令类型和处理该命令的处理器以及执行的executor的到一个并发map结构中。
再看它的处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
processReceived(ctx.channel(), (Command) msg);
}
private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType();
if (CommandType.HEART_BEAT.equals(commandType)) {
if (logger.isDebugEnabled()) {
logger.debug("server receive heart beat from: host: {}", ChannelUtils.getRemoteAddress(channel));
}
return;
}
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
if (pair != null) {
Runnable r = () -> {
try {
pair.getLeft().process(channel, msg);
} catch (Exception ex) {
logger.error("process msg {} error", msg, ex);
}
};
try {
pair.getRight().submit(r);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
}
} else {
logger.warn("commandType {} not support", commandType);
}
}
可以看出它是先从消息中拿出消息的CommandType,也就是命令模式的命令类型,如果是心跳命令就不处理,如果不是就从保存的map结构中拿出已经注册好的处理类和执行线程池。让线程池去执行该处理即可。