Namesrv配置类
一个是mqnamesrv的基本配置,一个是提供服务的netty的配置
对应的类:NamesrvConfig 和 NettyServerConfig
// namesrv配置类
public class NamesrvConfig {
// 日志 归类 名称:RocketmqNamesrv
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
public static void main(String[] args) {
System.out.println(System.getenv(MixAll.ROCKETMQ_HOME_ENV));
System.out.println(System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)));
System.out.println(System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json");
System.out.println(System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties");
}
// rocketmq 主目录 可以通过 -Drocketmq.home.dir=path 或通过设置环境变量 ROCKETMQ_HOME 来配置RocketMQ的主目录
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// NameServer 存储KV配置属性的持久化路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// NameServer 默认配置文件路经
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
// 环境
private String productEnvName = "center";
// 集群测试
private boolean clusterTest = false;
// 是否支持顺序消息,默认不支持
private boolean orderMessageEnable = false;
// 是否启动顺序消息
public boolean isOrderMessageEnable() {
return orderMessageEnable;
}
public void setOrderMessageEnable(boolean orderMessageEnable) {
this.orderMessageEnable = orderMessageEnable;
}
// 获取 mq 主文件夹
public String getRocketmqHome() {
return rocketmqHome;
}
public void setRocketmqHome(String rocketmqHome) {
this.rocketmqHome = rocketmqHome;
}
public String getKvConfigPath() {
return kvConfigPath;
}
public void setKvConfigPath(String kvConfigPath) {
this.kvConfigPath = kvConfigPath;
}
public String getProductEnvName() {
return productEnvName;
}
public void setProductEnvName(String productEnvName) {
this.productEnvName = productEnvName;
}
public boolean isClusterTest() {
return clusterTest;
}
public void setClusterTest(boolean clusterTest) {
this.clusterTest = clusterTest;
}
public String getConfigStorePath() {
return configStorePath;
}
public void setConfigStorePath(final String configStorePath) {
this.configStorePath = configStorePath;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
public class NettyServerConfig implements Cloneable {
// NameServer 监听端口
private int listenPort = 8888;
// Netty 业务线程池线程个数
private int serverWorkerThreads = 8;
// Netty public 任务线程池线程个数,Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等,
// 如果该业务类型未注册线程池,则由 public 线程池执行
private int serverCallbackExecutorThreads = 0;
// IO线程池线程个数,主要是 NameServer 、Broker 端解析请求、返回相应的线程个数,这类线程主要是处理网络请求,
// 解析请求包,然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回给调用方
private int serverSelectorThreads = 3;
// send oneway 消息请求并发度(Broker端参数)
private int serverOnewaySemaphoreValue = 256;
// 异步消息发送最大并行度(Broker端参数)
private int serverAsyncSemaphoreValue = 64;
// 网络连接最大空闲时间,默认120s,如果连接空闲时间超过该参数设置的值,连接将被关闭
private int serverChannelMaxIdleTimeSeconds = 120;
// 网络socket发送缓冲区大小,默认64k
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// 网络socket接收缓冲区大小,默认64k
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
// ByteBuffer 是否开启缓存,建议开启
private boolean serverPooledByteBufAllocatorEnable = true;
/**
* make make install
*
*
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
// 是否启用 Epoll IO模型,Linux环境建议开启
private boolean useEpollNativeSelector = false;
public int getListenPort() {
return listenPort;
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
public int getServerWorkerThreads() {
return serverWorkerThreads;
}
public void setServerWorkerThreads(int serverWorkerThreads) {
this.serverWorkerThreads = serverWorkerThreads;
}
public int getServerSelectorThreads() {
return serverSelectorThreads;
}
public void setServerSelectorThreads(int serverSelectorThreads) {
this.serverSelectorThreads = serverSelectorThreads;
}
public int getServerOnewaySemaphoreValue() {
return serverOnewaySemaphoreValue;
}
public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) {
this.serverOnewaySemaphoreValue = serverOnewaySemaphoreValue;
}
public int getServerCallbackExecutorThreads() {
return serverCallbackExecutorThreads;
}
public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) {
this.serverCallbackExecutorThreads = serverCallbackExecutorThreads;
}
public int getServerAsyncSemaphoreValue() {
return serverAsyncSemaphoreValue;
}
public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) {
this.serverAsyncSemaphoreValue = serverAsyncSemaphoreValue;
}
public int getServerChannelMaxIdleTimeSeconds() {
return serverChannelMaxIdleTimeSeconds;
}
public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) {
this.serverChannelMaxIdleTimeSeconds = serverChannelMaxIdleTimeSeconds;
}
public int getServerSocketSndBufSize() {
return serverSocketSndBufSize;
}
public void setServerSocketSndBufSize(int serverSocketSndBufSize) {
this.serverSocketSndBufSize = serverSocketSndBufSize;
}
public int getServerSocketRcvBufSize() {
return serverSocketRcvBufSize;
}
public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) {
this.serverSocketRcvBufSize = serverSocketRcvBufSize;
}
public boolean isServerPooledByteBufAllocatorEnable() {
return serverPooledByteBufAllocatorEnable;
}
public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) {
this.serverPooledByteBufAllocatorEnable = serverPooledByteBufAllocatorEnable;
}
public boolean isUseEpollNativeSelector() {
return useEpollNativeSelector;
}
public void setUseEpollNativeSelector(boolean useEpollNativeSelector) {
this.useEpollNativeSelector = useEpollNativeSelector;
}
@Override
public Object clone() throws CloneNotSupportedException {
return (NettyServerConfig) super.clone();
}
}