Zookeeper 源码(四)Zookeeper 服务端源码

Zookeeper 源码(四)Zookeeper 服务端源码

Zookeeper 源码(四)Zookeeper 服务端源码

Zookeeper 服务端的启动入口为 QuorumPeerMain

public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
} protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException {
// 1. 读取配置文件
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
} // 2. 创建并启动历史文件清理器
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start(); if (args.length == 1 && config.isDistributed()) {
// 3. 集群启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// 4. 单机启动
ZooKeeperServerMain.main(args);
}
}

一、单机启动

Zookeeper 源码(四)Zookeeper 服务端源码

(1) 启动入口【ZooKeeperServerMain】

public static void main(String[] args) {
ZooKeeperServerMain main = new ZooKeeperServerMain();
main.initializeAndRun(args);
} protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
} ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
} runFromConfig(config);
}

(2) 核心启动方法【ZooKeeperServerMain】

public void runFromConfig(ServerConfig config) throws IOException, AdminServerException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
// 1. 事务日志文件和快照数据文件处理器
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); // 2. 创建服务实例
ZooKeeperServer zkServer = new ZooKeeperServer( txnLog,
config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null); // 省略...
boolean needStartZKServer = true;
if (config.getClientPortAddress() != null) {
// 3. 创建底层通信实现,默认为 NIOServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); // 4. 启动服务(核心)
cnxnFactory.startup(zkServer);
// zkServer has been started. So we don't need to start it again in secureCnxnFactory.
needStartZKServer = false;
} // 省略...
if (cnxnFactory != null) {
cnxnFactory.join();
}
if (zkServer.isRunning()) {
zkServer.shutdown();
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}

(3) ZooKeeperServer【ZooKeeperServer】

public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) {
serverStats = new ServerStats(this);
this.txnLogFactory = txnLogFactory;
this.zkDb = zkDb;
this.tickTime = tickTime;
setMinSessionTimeout(minSessionTimeout);
setMaxSessionTimeout(maxSessionTimeout);
}

ServerStats 记录了服务端的以下信息:

属性 说明
packetsSent Zookeeper 启动后响应的次数
packetsReceived Zookeeper 启动后接收请求的次数
maxLatency、minLatency、totalLatency Zookeeper 启动后最大、最小、总延迟时间
count Zookeeper 启动后处理客户端请求的总次数

(4) createFactory【ServerCnxnFactory】

// 创建底层通信的 ServerCnxnFactory
public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
static public ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}

(5) startup【NettyServerCnxnFactory】

@Override
public void startup(ZooKeeperServer zks, boolean startServer)
throws IOException, InterruptedException {
// 1. 启动 netty
start();
setZooKeeperServer(zks);
if (startServer) {
// 2. 恢复本地数据
zks.startdata();
// 3. 启动会话管理器和请求处理链等
zks.startup();
}
} // 启动 netty
@Override
public void start() {
LOG.info("binding to port " + localAddress);
parentChannel = bootstrap.bind(localAddress);
}

(6) startdata【ZooKeeperServer】

// 恢复本地数据
public void startdata() throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
}
if (!zkDb.isInitialized()) {
loadData();
}
}

(7) startup【ZooKeeperServer】

// 启动会话管理器、注册请求处理链
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors(); registerJMX(); state = State.RUNNING;
notifyAll();
} // 启动会话管理器
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1, getZooKeeperServerListener());
}
protected void startSessionTracker() {
((SessionTrackerImpl)sessionTracker).start();
} // 注册请求处理链(核心,处理客户端请求)
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}

二、集群启动

Zookeeper 源码(四)Zookeeper 服务端源码

集群相比单机多个一个 Leader 选举的过程。Quorum 指多数,Peer 指法人,QuorumPeer 合起来表示多数派。

(1) 核心启动方法【QuorumPeerMain】

public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
} LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(), false);
} if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(), true);
} // 1. 初始化 QuorumPeer 并设置配置参数
quorumPeer = new QuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
// 2. 设置内存数据库
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
// 3. 设置底层通信 ServerCnxnFactory
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); // 4. 启动
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}

(2) start【QuorumPeer】

public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 1. 恢复本地数据
loadDataBase();
// 2. 启动 server
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 3. 设置选举算法
startLeaderElection();
// 4. 启动线程(QuorumPeer 继承自 Thread)
super.start();
} // 绑定端口,启动 server 端
private void startServerCnxnFactory() {
if (cnxnFactory != null) {
cnxnFactory.start();
}
if (secureCnxnFactory != null) {
secureCnxnFactory.start();
}
}

(3) startLeaderElection【QuorumPeer】

electionType 是从配置文件的 electionAlg 设置,在 QuorumPeerConfig 中默认为 3,也就是说默认会采用 FastLeaderElection 算法进行 Leader 选举。

// 默认采用 FastLeaderElection 算法进行选举
synchronized public void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
} if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
} protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null; //TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

(3) 启动 QuorumPeer 线程【QuorumPeer】

@Override
public void run() { // 省略...
try {
while (running) {
switch (getPeerState()) {
// 1. Leader 选举
case LOOKING:
// 省略...
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
break;
// 2. Observer
case OBSERVING:
try {
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
}
break;
// 3. Follower
case FOLLOWING:
try {
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
// 4. Leader
case LEADING:
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
start_fle = Time.currentElapsedTime();
}
} finally {
// 省略...
}
}

下面两节会重点关注 Leader 选举和请求处理。

参考:

  1. 从 Paxos 到 Zookeeper : 分布式一致性原理与实践

每天用心记录一点点。内容也许不重要,但习惯很重要!

上一篇:将Linux 标准输出,错误输出重定向到文件


下一篇:MyBatis学习总结(二)——使用MyBatis对表执行CRUD操作(转载)