【ES源码分析】Transport模块的初始化

文章目录

Transport模块的初始化

传输模块的初始化主要的在节点启动时的构造函数中完成的。

节点启动时,主要在构建函数中,进行通信模块的初始化。

    protected Node(
            final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
        logger = LogManager.getLogger(Node.class);
        final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
        boolean success = false;
        try {
            ...
            ...
            ...
            // 过滤出ActionPlugin插件列表作为参数传入 ActionModule构造函数,在ActionModule中会对TCP和HTTP的请求和处理类进行注册和绑定。
            ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),    settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
            ...

            final RestController restController = actionModule.getRestController();
            // 过滤出NetworkPlugin插件列表作为参数传入 NetworkModule构造函数
            final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
                threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
                networkService, restController);
            ...
            ...
            ...  
            // 通过网络模块获取已经初始化的Transport
            final Transport transport = networkModule.getTransportSupplier().get();
            Set<String> taskHeaders = Stream.concat(
                pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
                Stream.of(Task.X_OPAQUE_ID)
            ).collect(Collectors.toSet());
            // 基于网络模块的Transport构建TransportService
            final TransportService transportService = newTransportService(settings, transport, threadPool,
                networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
            final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
            // 基于TransportService构建searchTransportService服务
            final SearchTransportService searchTransportService =  new SearchTransportService(transportService,
                SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
            final Consumer<Binder> httpBind;
            final HttpServerTransport httpServerTransport;
            // 通过网络模块获取已经初始化的Transport,HTTP还可以关闭?
            if (networkModule.isHttpEnabled()) {
                httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
                httpBind = b -> {
                    b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
                };
            } else {
                httpBind = b -> {
                    b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
                };
                httpServerTransport = null;
            }
            ...
            ...
            ...    
            if (NetworkModule.HTTP_ENABLED.get(settings)) {
                logger.debug("initializing HTTP handlers ...");
                // 初始化REST的请求和处理类的映射
                actionModule.initRestHandlers(() -> clusterService.state().nodes());
            }
            logger.info("initialized");

            success = true;
            ...
            ...
            ...  
    }

ActionModule的初始化

ActionModule的内部初始化是通过插件的方式加载的,主要完成注册Action与处理类的映射和RestController的创建。

    public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
                        IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
                        ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
            CircuitBreakerService circuitBreakerService, UsageService usageService) {
        this.transportClient = transportClient;
        this.settings = settings;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
        this.indexScopedSettings = indexScopedSettings;
        this.clusterSettings = clusterSettings;
        this.settingsFilter = settingsFilter;
        this.actionPlugins = actionPlugins;
        // 进行Action设置,action注册和对应handler的映射绑定
        actions = setupActions(actionPlugins);
        // 对action过滤器进行配置
        actionFilters = setupActionFilters(actionPlugins);
        autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
        destructiveOperations = new DestructiveOperations(settings, clusterSettings);
        Set<String> headers = Stream.concat(
            actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
            Stream.of(Task.X_OPAQUE_ID)
        ).collect(Collectors.toSet());
        // rest包装器
        UnaryOperator<RestHandler> restWrapper = null;
        for (ActionPlugin plugin : actionPlugins) {
            UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
            if (newRestWrapper != null) {
                logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName());
                if (restWrapper != null) {
                    throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper");
                }
                restWrapper = newRestWrapper;
            }
        }
        mappingRequestValidators = new TransportPutMappingAction.RequestValidators(
            actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList())
        );

        if (transportClient) {
            restController = null;
        } else {
            // 构建出RestController对象
            restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
        }
    }

NetworkModule的初始化

构造NetworkModule对象,在执行构造函数的时候,通过插件的方式加载这3个成员对象。

主要数据成员对象

Map<String, Supplier<Transport>> transportFactories
Map<String, Supplier<HttpServerTransport>> transportHttpFactories
List<TransportInterceptor> transportIntercetors

Transport :负责内部节点的RPC请求

HttpServerTransport:负责客户端的REST服务

TransportInterceptor:传输层拦截器

    public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
                         BigArrays bigArrays,
                         PageCacheRecycler pageCacheRecycler,
                         CircuitBreakerService circuitBreakerService,
                         NamedWriteableRegistry namedWriteableRegistry,
                         NamedXContentRegistry xContentRegistry,
                         NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) {
        this.settings = settings;
        this.transportClient = transportClient;
        // 遍历插件,分别注册HttpTransport,Transport,TransportInterceptor
        for (NetworkPlugin plugin : plugins) {
            if (transportClient == false && HTTP_ENABLED.get(settings)) {
                Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
                    circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher);
                for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
                    // 其实检查不能使用TransportClient创建HTTP的通信传输,不能有同名的HttpServerTransport
                    registerHttpTransport(entry.getKey(), entry.getValue());
                }
            }
            Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
                circuitBreakerService, namedWriteableRegistry, networkService);
            for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
                // 检查不有同名的Transport
                registerTransport(entry.getKey(), entry.getValue());
            }
            List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
                threadPool.getThreadContext());
            for (TransportInterceptor interceptor : transportInterceptors) {
                // 注册传输层拦截器
                registerTransportInterceptor(interceptor);
            }
        }
    }

在构造完成时候,通过getTransportSuppliergetHttpServerTransportSuppliergetTransportInterceptor对外提供服务。

NetworkPlugin

NetworkPlugin是一个接口,Netty4Plugin实现了它,同时继承了Plugin。

在Netty4Plugin中实现了NetworkPlugin的getTransports和getHttpTransports方法,分别构建了Netty4Transport和Netty4HttpServerTransport用于Transport(TCP)传输和HTTP传输。

Netty4Transport

【ES源码分析】Transport模块的初始化

通过类图,可以发现Netty4Transport继承了TcpTransport,TcpTransport实现了Transport接口,这应该是在实现传输层,控制数据的在传输层的交互。Netty4Transport中实现了doStart的抽象方法,用来启动TCP服务。在启动的时候,默认情况下,同时构建了Client端和Server端。主要借助netty4框架来实现这些功能。

    @Override
    protected void doStart() {
        boolean success = false;
        try {
            ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX);
            eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory);
            // 初始化客户端
            clientBootstrap = createClientBootstrap(eventLoopGroup);
            // 默认是开启服务端配置的,初始化Server端
            if (NetworkService.NETWORK_SERVER.get(settings)) {
                for (ProfileSettings profileSettings : profileSettings) {
                    createServerBootstrap(profileSettings, eventLoopGroup);
                    bindServer(profileSettings);
                }
            }
            super.doStart();
            success = true;
        } finally {
            if (success == false) {
                doStop();
            }
        }
    }

Netty4HttpServerTransport

【ES源码分析】Transport模块的初始化

根据类图发现 Netty4HttpServerTransport 同时继承了AbstractLifecycleComponent和实现了HttpServerTransport。

同样实现了doStart的抽象方法,用来启动HTTP Server服务。在HTTP Server服务中配置了监听端口和处理器。这里其实应该是通过Netty4来完成HTTP协议下的传输层的部分。

@Override
    protected void doStart() {
       ...
            // 配置了请求的处理类HttpChannelHandler
            serverBootstrap.childHandler(configureServerChannelHandler());
       ...
           // 绑定端口作为HTTP监听端口
            this.boundAddress = createBoundHttpAddress();
       ...
    }

configureServerChannelHandler方法中构建了一个HttpChannelHandler对象,HttpChannelHandler的构造函数中有两个成员变量Netty4HttpServerTransportNetty4HttpRequestHandler。当收到请求的时候,会调用dispatchRequest对不同的请求执行相应的处理。dispatchRequest是接口HttpServerTransport类的内部接口Dispatcher中的方法。主要用来转发请求。它的主要实现类是RestController

TransportService

在Node的启动过程中会在初始化Transport之后,基于Transport构建TransportService。newTransportService方法中其实是调用TransportService的构造函数。

    public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
        this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders,
             // 创建连接管理器
            new ConnectionManager(settings, transport));
    }

ConnectionManager是Node传输连接的管理类,通过这个类的connectionManager.connectToNode(node, connectionProfile, connectionValidator(node));解析连接配置文件创建内部连接。在ConnectionManager的构造函数中调用ConnectionProfile.buildDefaultConnectionProfile(settings),这个方法会根据配置文件提供的信息创建连接。在TransportRequestOptions.Type中发现连接的分类。

public class TransportRequestOptions {
  ...
  ...
  ...
  public enum Type {
        RECOVERY, // 用于恢复
        BULK,     // 用于批量写入
        REG,      // 不是很清楚,有一个是集群注册
        STATE,    // 传输集群的状态
        PING      // ping请求
    }
    
}

TransportSettings类中发现各类TCP连接的连接数,默认合计13个。

public final class TransportSettings {
        ...
        ...
        ...        
    public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY =
        intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope);
    public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK =
        intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope);
    public static final Setting<Integer> CONNECTIONS_PER_NODE_REG =
        intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope);
    public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE =
        intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope);
    public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
        intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
        ...
        ...
        ...
    }
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
                        Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
                        Set<String> taskHeaders, ConnectionManager connectionManager) {
    // The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
    this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false ||
        TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);
    this.transport = transport;
    this.threadPool = threadPool;
    this.localNodeFactory = localNodeFactory;
    this.connectionManager = connectionManager;
    this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
    setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
    setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
    tracerLog = Loggers.getLogger(logger, ".tracer");
    // 任务管理器服务,用于跟踪节点上当前正在运行的任务
    taskManager = createTaskManager(settings, threadPool, taskHeaders);
    this.interceptor = transportInterceptor;
    // 异步发送者-发送集群节点间的通信请求
    this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
    // 远程集群--CCR
    this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
    remoteClusterService = new RemoteClusterService(settings, this);
    responseHandlers = transport.getResponseHandlers();
    if (clusterSettings != null) {
        clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
        clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
        if (connectToRemoteCluster) {
            remoteClusterService.listenForUpdates(clusterSettings);
        }
    }
    // 注册心跳Action与handler的映射
    registerRequestHandler(
        HANDSHAKE_ACTION_NAME,
        () -> HandshakeRequest.INSTANCE,
        ThreadPool.Names.SAME,
        false, false,
        (request, channel) -> channel.sendResponse(
            new HandshakeResponse(localNode, clusterName, localNode.getVersion())));
}

伴随着在Node初始化的最后会调用,进行RestHandler的注册映射。通信模块的初始化部分就结束了。

上一篇:Comparable和Comparator的使用


下一篇:ES 去重/聚合查询