文章目录
Transport模块的初始化
传输模块的初始化主要的在节点启动时的构造函数中完成的。
节点启动时,主要在构建函数中,进行通信模块的初始化。
protected Node(
final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
logger = LogManager.getLogger(Node.class);
final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
boolean success = false;
try {
...
...
...
// 过滤出ActionPlugin插件列表作为参数传入 ActionModule构造函数,在ActionModule中会对TCP和HTTP的请求和处理类进行注册和绑定。
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
...
final RestController restController = actionModule.getRestController();
// 过滤出NetworkPlugin插件列表作为参数传入 NetworkModule构造函数
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController);
...
...
...
// 通过网络模块获取已经初始化的Transport
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());
// 基于网络模块的Transport构建TransportService
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
// 基于TransportService构建searchTransportService服务
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final Consumer<Binder> httpBind;
final HttpServerTransport httpServerTransport;
// 通过网络模块获取已经初始化的Transport,HTTP还可以关闭?
if (networkModule.isHttpEnabled()) {
httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
httpBind = b -> {
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
};
} else {
httpBind = b -> {
b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
};
httpServerTransport = null;
}
...
...
...
if (NetworkModule.HTTP_ENABLED.get(settings)) {
logger.debug("initializing HTTP handlers ...");
// 初始化REST的请求和处理类的映射
actionModule.initRestHandlers(() -> clusterService.state().nodes());
}
logger.info("initialized");
success = true;
...
...
...
}
ActionModule的初始化
ActionModule的内部初始化是通过插件的方式加载的,主要完成注册Action与处理类的映射和RestController的创建。
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
CircuitBreakerService circuitBreakerService, UsageService usageService) {
this.transportClient = transportClient;
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexScopedSettings = indexScopedSettings;
this.clusterSettings = clusterSettings;
this.settingsFilter = settingsFilter;
this.actionPlugins = actionPlugins;
// 进行Action设置,action注册和对应handler的映射绑定
actions = setupActions(actionPlugins);
// 对action过滤器进行配置
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<String> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());
// rest包装器
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
if (newRestWrapper != null) {
logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName());
if (restWrapper != null) {
throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper");
}
restWrapper = newRestWrapper;
}
}
mappingRequestValidators = new TransportPutMappingAction.RequestValidators(
actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList())
);
if (transportClient) {
restController = null;
} else {
// 构建出RestController对象
restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
}
}
NetworkModule的初始化
构造NetworkModule对象,在执行构造函数的时候,通过插件的方式加载这3个成员对象。
主要数据成员对象
Map<String, Supplier<Transport>> transportFactories
Map<String, Supplier<HttpServerTransport>> transportHttpFactories
List<TransportInterceptor> transportIntercetors
Transport :负责内部节点的RPC请求
HttpServerTransport:负责客户端的REST服务
TransportInterceptor:传输层拦截器
public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) {
this.settings = settings;
this.transportClient = transportClient;
// 遍历插件,分别注册HttpTransport,Transport,TransportInterceptor
for (NetworkPlugin plugin : plugins) {
if (transportClient == false && HTTP_ENABLED.get(settings)) {
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher);
for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
// 其实检查不能使用TransportClient创建HTTP的通信传输,不能有同名的HttpServerTransport
registerHttpTransport(entry.getKey(), entry.getValue());
}
}
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
circuitBreakerService, namedWriteableRegistry, networkService);
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
// 检查不有同名的Transport
registerTransport(entry.getKey(), entry.getValue());
}
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
threadPool.getThreadContext());
for (TransportInterceptor interceptor : transportInterceptors) {
// 注册传输层拦截器
registerTransportInterceptor(interceptor);
}
}
}
在构造完成时候,通过getTransportSupplier
,getHttpServerTransportSupplier
,getTransportInterceptor
对外提供服务。
NetworkPlugin
NetworkPlugin是一个接口,Netty4Plugin实现了它,同时继承了Plugin。
在Netty4Plugin中实现了NetworkPlugin的getTransports和getHttpTransports方法,分别构建了Netty4Transport和Netty4HttpServerTransport用于Transport(TCP)传输和HTTP传输。
Netty4Transport
通过类图,可以发现Netty4Transport继承了TcpTransport,TcpTransport实现了Transport接口,这应该是在实现传输层,控制数据的在传输层的交互。Netty4Transport中实现了doStart的抽象方法,用来启动TCP服务。在启动的时候,默认情况下,同时构建了Client端和Server端。主要借助netty4框架来实现这些功能。
@Override
protected void doStart() {
boolean success = false;
try {
ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX);
eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory);
// 初始化客户端
clientBootstrap = createClientBootstrap(eventLoopGroup);
// 默认是开启服务端配置的,初始化Server端
if (NetworkService.NETWORK_SERVER.get(settings)) {
for (ProfileSettings profileSettings : profileSettings) {
createServerBootstrap(profileSettings, eventLoopGroup);
bindServer(profileSettings);
}
}
super.doStart();
success = true;
} finally {
if (success == false) {
doStop();
}
}
}
Netty4HttpServerTransport
根据类图发现 Netty4HttpServerTransport 同时继承了AbstractLifecycleComponent和实现了HttpServerTransport。
同样实现了doStart的抽象方法,用来启动HTTP Server服务。在HTTP Server服务中配置了监听端口和处理器。这里其实应该是通过Netty4来完成HTTP协议下的传输层的部分。
@Override
protected void doStart() {
...
// 配置了请求的处理类HttpChannelHandler
serverBootstrap.childHandler(configureServerChannelHandler());
...
// 绑定端口作为HTTP监听端口
this.boundAddress = createBoundHttpAddress();
...
}
configureServerChannelHandler
方法中构建了一个HttpChannelHandler
对象,HttpChannelHandler
的构造函数中有两个成员变量Netty4HttpServerTransport
和Netty4HttpRequestHandler
。当收到请求的时候,会调用dispatchRequest
对不同的请求执行相应的处理。dispatchRequest
是接口HttpServerTransport
类的内部接口Dispatcher中的方法。主要用来转发请求。它的主要实现类是RestController
。
TransportService
在Node的启动过程中会在初始化Transport之后,基于Transport构建TransportService。newTransportService方法中其实是调用TransportService的构造函数。
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders,
// 创建连接管理器
new ConnectionManager(settings, transport));
}
ConnectionManager
是Node传输连接的管理类,通过这个类的connectionManager.connectToNode(node, connectionProfile, connectionValidator(node));
解析连接配置文件创建内部连接。在ConnectionManager
的构造函数中调用ConnectionProfile.buildDefaultConnectionProfile(settings)
,这个方法会根据配置文件提供的信息创建连接。在TransportRequestOptions.Type
中发现连接的分类。
public class TransportRequestOptions {
...
...
...
public enum Type {
RECOVERY, // 用于恢复
BULK, // 用于批量写入
REG, // 不是很清楚,有一个是集群注册
STATE, // 传输集群的状态
PING // ping请求
}
}
在TransportSettings
类中发现各类TCP连接的连接数,默认合计13个。
public final class TransportSettings {
...
...
...
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY =
intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK =
intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG =
intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE =
intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
...
...
...
}
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders, ConnectionManager connectionManager) {
// The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false ||
TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);
this.transport = transport;
this.threadPool = threadPool;
this.localNodeFactory = localNodeFactory;
this.connectionManager = connectionManager;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
tracerLog = Loggers.getLogger(logger, ".tracer");
// 任务管理器服务,用于跟踪节点上当前正在运行的任务
taskManager = createTaskManager(settings, threadPool, taskHeaders);
this.interceptor = transportInterceptor;
// 异步发送者-发送集群节点间的通信请求
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
// 远程集群--CCR
this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
remoteClusterService = new RemoteClusterService(settings, this);
responseHandlers = transport.getResponseHandlers();
if (clusterSettings != null) {
clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
if (connectToRemoteCluster) {
remoteClusterService.listenForUpdates(clusterSettings);
}
}
// 注册心跳Action与handler的映射
registerRequestHandler(
HANDSHAKE_ACTION_NAME,
() -> HandshakeRequest.INSTANCE,
ThreadPool.Names.SAME,
false, false,
(request, channel) -> channel.sendResponse(
new HandshakeResponse(localNode, clusterName, localNode.getVersion())));
}
伴随着在Node初始化的最后会调用,进行RestHandler的注册映射。通信模块的初始化部分就结束了。