之所以要测试这个场景,是因为最近开发还有个缺陷未解决,leader很忙,客户端a断开了,但是断开信息未同步到follower,follower选举了新的leader,新的leader不知道客户端a断开了,所以客户端a重新连接到新的leader后,信息还在,连接没有断开。事实上没有错,逻辑上要知道的是同步机制的问题,比如kafka支持at least one follower接收到了。
会话由SessionTracker管理,它是一个线程,里面采用ExpireQueue实现,定时轮训,参见https://blog.csdn.net/jpf254/article/details/80804458。和服务器日志是一样的,如下:
根据服务器角色不同,ZooKeeperServer,LeaderZooKeeperServer,FollowerZooKeeperServer,ObserverZooKeeperServer分别代表单机服务器,集群leader服务器,集群Follower服务器,集群observer服务器,它们的sessionTracker实现是不同的。ZookeeperServer的对应sessionTracker实现是SessionTrackerImpl;LeaderZooKeeperServer的对应sessionTracker实现是LeaderSessionTracker,FollowerZooKeeperServer,ObserverZooKeeperServer的对应sessionTracker实现是LearnerSessionTracker。
在集群模式下,leader发送一个ping消息给它的learner,learner返回自从last PING之后的一个session列表。leader每隔半个tick就会发送一个ping给learner。所以,如果一个tick被设置成2秒,那么leader每秒就会发送一个ping。
3.5开始进一步可以分为globalSessionTracker和LocalSessionTracker(只有部分功能,本地自我管理,不支持临时节点,不能重连),其中单机只有一个标准的SessionTrackerImpl,集群leader开启globalSessionTracker和LocalSessionTracker,follower和observer只开启LocalSessionTracker。globalSessionTracker由SessionTrackerImpl实现,LocalSessionTracker继承并扩展了SessionTrackerImpl。
https://blog.csdn.net/damacheng/article/details/42393771 会话
https://www.cnblogs.com/leixiaobai/p/10245991.html jmeter插件
https://www.cnblogs.com/farmersun/p/12813442.html zk程序员指南
package com.hundsun.datacompare; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.KeeperException; import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.*; /** * @author zjhua */ public class DataGenerateForZookeeperServiceImpl implements DataGenerateForZookeeperService { @Autowired private DataGenerateForZookeeperConfig config; @Override public void gen() { if (config == null) { config = new DataGenerateForZookeeperConfig(); } int threads = config.getParallel() == 0 ? Runtime.getRuntime().availableProcessors()/2 : config.getParallel(); ExecutorService executorService = new ThreadPoolExecutor(threads,threads,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>()); for (int c=1;c<=threads;c++) { int finalC = c; executorService.submit(new Runnable() { @Override public void run() { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( config.getZkUrl(), new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); try { for (int i = 1; i <= config.getCount(); i++) { try { String keyName = config.getPrefix() + "-" + finalC + i; // 2.Client API test // 2.1 Create node String data1 = StringUtils.rightPad("value", config.getValueSize(), "1"); // print("create", config.getZkPath() + keyName, data1); client.create(). creatingParentsIfNeeded(). forPath(config.getZkPath() + keyName, data1.getBytes()); // // 2.2 Get node and data // print("ls", "/"); // print(client.getChildren().forPath("/")); // print("get", config.getZkPath() + keyName); // print(client.getData().forPath(config.getZkPath() + keyName)); // // // 2.3 Modify data // String data2 = "world"; // print("set", config.getZkPath() + keyName, data2); // client.setData().forPath(config.getZkPath() + keyName, data2.getBytes()); // print("get", config.getZkPath() + keyName); // print(client.getData().forPath(config.getZkPath() + keyName)); // // // 2.4 Remove node // if (i >= 2000) { // print("delete", config.getZkPath() + keyName); // client.delete().forPath(config.getZkPath() + keyName); // Thread.sleep(3000); // print("ls", "/"); // print(client.getChildren().forPath("/")); // } } catch (KeeperException.NodeExistsException e) { // NOP } } System.out.println(Thread.currentThread().getName() + " created " + config.getCount() + " " + config.getValueSize() + "byte nodes under " + config.getZkPath() + ""); client.close(); } catch (Exception e) { e.printStackTrace(); } } }); } try { boolean finish = false; while (!finish) { executorService.awaitTermination(3600, TimeUnit.SECONDS); System.out.println("尚未完成,继续等待"); } } catch (InterruptedException e) { e.printStackTrace(); } } private static void print(String... cmds) { StringBuilder text = new StringBuilder("$ "); for (String cmd : cmds) { text.append(cmd).append(" "); } System.out.println(text.toString()); } private static void print(Object result) { System.out.println( result instanceof byte[] ? new String((byte[]) result) : result); } public static void main(String[] args) { DataGenerateForZookeeperServiceImpl dataGenerateForZookeeperService = new DataGenerateForZookeeperServiceImpl(); dataGenerateForZookeeperService.gen(); } }
注:zk存在一个问题,即使节点删除了,内存也没有释放,需要重启后内存才会释放。
也有些人问,不过好像没特别好的方法,主要是:https://*.com/questions/50437481/confusing-zookeeper-memory-usage、http://zookeeper-user.578899.n2.nabble.com/ZooKeeper-Memory-Usage-td7267902.html
后来dump出来堆栈进行分析,及查找相关资料,可知zk为了同步修改到follower更快,会在leader节点缓存最近的500个提交日志(如果follower超过500个跟不上,就要拿leader的快照全同步了,但它有个问题,如果zk很大,会导致同步一直失败,所以3.5版本增加了一个特性,判断txnlog是不是比snapshot小很多,目前是1/3,如果是,则使用事务日志而不是快照),即ZKDatabase.committedLog。这不是问题,问题在于zk里面保留了事务数据的3份拷贝,为:
- The first is in committedLog[i].request.request.hb - a heap-allocated ByteBuffer.
- The second is in committedLog[i].request.txn.data - a jute-serialised record of the transaction
- The third is in committedLog[i].packet.data - also jute-serialised, seemingly uninitialised data.
和dump分析结果完全一致,也可以参见https://issues.apache.org/jira/browse/ZOOKEEPER-1473,它建议把事务日志份数调整为可配置,但是提交的PR有bug,该bug尚未修复。
默认情况下,zk采用WAL机制,为了保证宕机后数据不丢失,WAL会采用同步写磁盘的方式,由参数forceSync控制,默认为yes,所以一定要保证dataLogDir目录在非常快的磁盘上,最好是SSD,如果为false,则只是写到了OS的pagecache,操作系统宕机后可能会丢数据。 为了检测写磁盘的性能,可以配置系统属性fsync.warningthresholdms(3.3.4引入)=20,如果数据固化到磁盘的操作fsync超过20ms的时候,将会在zookeeper.out中输出一条warn日志,默认是1秒。