/** * 和Leader同步数据 */ void syncWithLeader(long newLeaderZxid) throws IOException { // Follower启动服务 zk.startup(); }
public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); registerJMX(); setState(State.RUNNING); notifyAll(); }
/** * FollowerZooKeeperServer */ @Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower())); syncProcessor.start(); }
/** * FollowerRequestProcessor.start() */ @Override public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (request == Request.requestOfDeath) { break; } // 进入下一级的处理链CommitProcessor nextProcessor.processRequest(request); // 根据请求类型,如果是事务请求,则需要转发给Leader switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.delete: case OpCode.setData: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: case OpCode.multi: zks.getFollower().request(request); break; } } } catch (Exception e) { handleException(this.getName(), e); } LOG.info("FollowerRequestProcessor exited loop!"); }
/** * FollowerRequestProcessor#run() * <p> * 转发事务请求给Leader * Leader.REQUEST */ void request(Request request) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream oa = new DataOutputStream(baos); oa.writeLong(request.sessionId); oa.writeInt(request.cxid); oa.writeInt(request.type); if (request.request != null) { request.request.rewind(); int len = request.request.remaining(); byte[] b = new byte[len]; request.request.get(b); request.request.rewind(); oa.write(b); } oa.close(); QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); writePacket(qp, true); }
/** * CommitProcessor.start() */ @Override public void run() { try { Request nextPending = null; // 循环 while (!finished) { for (Request toProcess1 : toProcess) { nextProcessor.processRequest(toProcess1); } toProcess.clear(); synchronized (this) { if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() == 0) { wait(); continue; } // First check and see if the commit came in for the pending // request if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() > 0) { Request r = committedRequests.remove(); // 如果是在同一个节点发起的事务请求 if (nextPending != null && nextPending.sessionId == r.sessionId && nextPending.cxid == r.cxid) { nextPending.hdr = r.hdr; nextPending.txn = r.txn; nextPending.zxid = r.zxid; toProcess.add(nextPending); nextPending = null; } else { toProcess.add(r); } } } // 顺序一致,会在这一直wait,非事务请求也无法正常处理 // We haven't matched the pending requests, so go back to waiting if (nextPending != null) { continue; } synchronized (this) { while (nextPending == null && queuedRequests.size() > 0) { Request request = queuedRequests.remove(); switch (request.type) { case OpCode.create: case OpCode.delete: case OpCode.setData: case OpCode.multi: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: nextPending = request; break; case OpCode.sync: if (matchSyncs) { nextPending = request; } else { toProcess.add(request); } break; default: toProcess.add(request); } } } } } catch (InterruptedException e) { LOG.warn("Interrupted exception while waiting", e); } catch (Throwable e) { LOG.error("Unexpected exception causing CommitProcessor to exit", e); } LOG.info("CommitProcessor exited loop!"); }
/** * SyncRequestProcessor.start() * commitProcessor.processRequest(toProcess1) */ @Override public void run() { try { int logCount = 0; // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time setRandRoll(r.nextInt(snapCount / 2)); while (true) { Request si = null; if (toFlush.isEmpty()) { si = queuedRequests.take(); } else { si = queuedRequests.poll(); if (si == null) { flush(toFlush); continue; } } if (si == requestOfDeath) { break; } if (si != null) { // track the number of records written to the log if (zks.getZKDatabase().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { setRandRoll(r.nextInt(snapCount / 2)); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } } }; snapInProcess.start(); } logCount = 0; } } else if (toFlush.isEmpty()) { // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor if (nextProcessor != null) { nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable) nextProcessor).flush(); } } continue; } toFlush.add(si); if (toFlush.size() > 1000) { flush(toFlush); } } } } catch (Throwable t) { handleException(this.getName(), t); running = false; } LOG.info("SyncRequestProcessor exited!"); }
/** * Follower.processPacket() * Examine the packet received in qp and dispatch based on its contents. */ private void processPacket(QuorumPacket qp) throws IOException { switch (qp.getType()) { case Leader.PING: ping(qp); break; case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); if (hdr.getZxid() != lastQueued + 1) { LOG.warn("Got zxid 0x" + Long.toHexString(hdr.getZxid()) + " expected 0x" + Long.toHexString(lastQueued + 1)); } lastQueued = hdr.getZxid(); fzk.logRequest(hdr, txn); break; case Leader.COMMIT: fzk.commit(qp.getZxid()); break; case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Follower started"); break; case Leader.REVALIDATE: revalidate(qp); break; case Leader.SYNC: fzk.sync(); break; default: LOG.error("Invalid packet type: {} received by Observer", qp.getType()); } }
/** * FollowerZooKeeperServer.commit() * @param zxid */ public void commit(long zxid) { if (pendingTxns.size() == 0) { LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn"); return; } long firstElementZxid = pendingTxns.element().zxid; // 顺序一致 if (firstElementZxid != zxid) { LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid)); System.exit(12); } Request request = pendingTxns.remove(); commitProcessor.commit(request); }