dolphinscheduler源码解析-NettyServerHandler

DolphinScheduler源码解析-NettyServerHandler

文章目录

类定义

@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter 

注解说明了这个handler可以被多个channel共享,然后该类又继承了ChannelInboundHandlerAdapter,他就可以处理各种网络事件,如register,unregistred等事件。

类属性

private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

/**
  * netty remote server
  */
private final NettyRemotingServer nettyRemotingServer;

/**
  * server processors queue
  */
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap<>();

可以看到它持有一个NettyRemotingServer类的引用。 它还定义了一个并发的map结构,用来存储命令类型和处理类以及executor。

我们看它的构造方法, 给NettyRemotingServer类的引用赋值。

public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
    this.nettyRemotingServer = nettyRemotingServer;
}

再看它的方法

public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
    this.registerProcessor(commandType, processor, null);
}

public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
    ExecutorService executorRef = executor;
    if (executorRef == null) {
        executorRef = nettyRemotingServer.getDefaultExecutor();
    }
    this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
}

可以看出它的registerProcessor方法就是保存了命令类型和处理该命令的处理器以及执行的executor的到一个并发map结构中。

再看它的处理

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    processReceived(ctx.channel(), (Command) msg);
}


private void processReceived(final Channel channel, final Command msg) {
    final CommandType commandType = msg.getType();
    if (CommandType.HEART_BEAT.equals(commandType)) {
        if (logger.isDebugEnabled()) {
            logger.debug("server receive heart beat from: host: {}", ChannelUtils.getRemoteAddress(channel));
        }
        return;
    }
    final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
    if (pair != null) {
        Runnable r = () -> {
            try {
                pair.getLeft().process(channel, msg);
            } catch (Exception ex) {
                logger.error("process msg {} error", msg, ex);
            }
        };
        try {
            pair.getRight().submit(r);
        } catch (RejectedExecutionException e) {
            logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
        }
    } else {
        logger.warn("commandType {} not support", commandType);
    }
}

可以看出它是先从消息中拿出消息的CommandType,也就是命令模式的命令类型,如果是心跳命令就不处理,如果不是就从保存的map结构中拿出已经注册好的处理类和执行线程池。让线程池去执行该处理即可。

上一篇:NET Core 实战 Dapper 扩展数据访问


下一篇:C#连接SQL Server数据库(二)