zookeeper有本身的回调和监听机制,使用redis作分布式锁,如果分布式机器多的话,同步会造成一定的时间差,zookeeper的回调和监听,由zk主动进行业务代码调用,数据间同步迅速快捷。且结合临时节点,可以在client网络断开的时候,清除节点,避免死锁,临时节点也可以设置存活的时长。
使用zookeeper的临时、序列节点
原理:
在同一个目录下,由最早的节点获取锁,如果监听整个目录,每次释放锁都,排查文件号最小的,占用的资源会比较多。
所以这里采用的方案是,每个节点给前面的节点添加监听,当前面的节点释放锁被删除之后,则,通知后续节点。即,拿到锁的是最小的节点,当它释放锁,删除节点,会根据删除事件通知后续的一个节点。
注:如果是中间节点被手动删除了,也不影响。
以下撰写的分布式锁,满足可重入的要求:
//maven lombok此处不写 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> </dependency>
//zookeeper连接工具类 public class ZkUUtiles { private static ZooKeeper zk; private static CountDownLatch cc = new CountDownLatch(1); public static ZooKeeper getZk(){ LDefaultWatcher defaultWatcher = new LDefaultWatcher(); defaultWatcher.setCc(cc); try { zk = new ZooKeeper("127.0.0.1:2181/testLock2", 3000, defaultWatcher); cc.await(); } catch (Exception e) { e.printStackTrace(); } return zk; } }
//默认监听 @Data public class LDefaultWatcher implements Watcher { private CountDownLatch cc; @Override public void process(WatchedEvent event) { switch (event.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: cc.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; case Closed: break; } } }
//分布式锁,回调机制阻塞 @Data public class LDLockWatchBack implements AsyncCallback.StringCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback , Watcher { private ZooKeeper zk; private String threadName; private String nodeName; private CountDownLatch cc = new CountDownLatch(1); private Integer counInTimes = 1; public void tryLock(){ try { //重入锁 if (isTwiceIn()) return; zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this,""); cc.await(); } catch (Exception e) { e.printStackTrace(); } } private boolean isTwiceIn() { byte[] data = new byte[0]; try { data = zk.getData("/", false, new Stat()); } catch (Exception e) { e.printStackTrace(); } if(threadName.equals(new String(data))){ counInTimes++; return true; } return false; } public void unLock(){ try { if(--counInTimes==0){ zk.delete(nodeName,-1); zk.setData("/","no_thread_lock".getBytes(),-1); } } catch (Exception e) { e.printStackTrace(); } } @Override //StringCallback public void processResult(int rc, String path, Object ctx, String name) { if(name !=null){ System.out.println(threadName+" create "+name); nodeName = name; zk.getChildren("/",false, this,"dfs"); } } @Override //childCallback public void processResult(int rc, String path, Object ctx, List<String> children) { //判断当前节点是不是第一个 //如果是,则拿到锁,可以继续运行下去 //如果不是,则给前面节点注册监听事件 //先排序 Collections.sort(children); int i = children.indexOf(nodeName.substring(1)); if(i ==0 ){ try { zk.setData("/",threadName.getBytes(),-1); Thread.sleep(10); //业务代码,肯定至少运行10ms,此处如果不sleep,会造成,第一个节点很快已经走完了, // 而后续节点,已经拿到了children,还没有给前节点注册上监听,但已经能判断自己不是第一个 //这种情况,第一个节点挂了,但没有监听,所以不会进行后续节点的通知。即,第一条线程执行完后,不再走后续线程 } catch (Exception e) { e.printStackTrace(); } cc.countDown(); }else{ zk.exists("/"+children.get(i-1),this, this,""); } } @Override //StatCallback public void processResult(int rc, String path, Object ctx, Stat stat) { //待实现 } @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: break; case NodeDeleted: zk.getChildren("/",false, this,"dfs"); break; case NodeDataChanged: break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; case PersistentWatchRemoved: break; } } }
//测试代码 public class PDLockTest { private ZooKeeper zk; @Before public void getZK(){ zk = ZkUUtiles.getZk(); } @After public void closeZ(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void dLockTest(){ for (int i = 0; i < 10; i++) { new Thread(()->{ LDLockWatchBack ldLockWatchBack = new LDLockWatchBack(); ldLockWatchBack.setZk(zk); ldLockWatchBack.setThreadName(Thread.currentThread().getName()); //抢锁 ldLockWatchBack.tryLock(); ldLockWatchBack.tryLock(); ldLockWatchBack.tryLock(); ldLockWatchBack.tryLock(); ldLockWatchBack.tryLock(); ldLockWatchBack.tryLock(); //工作 System.out.println(Thread.currentThread().getName()+":工作中。。。"); // try { // TimeUnit.SECONDS.sleep(1); // } catch (InterruptedException e) { // e.printStackTrace(); // } //释放锁 ldLockWatchBack.unLock(); ldLockWatchBack.unLock(); ldLockWatchBack.unLock(); ldLockWatchBack.unLock(); ldLockWatchBack.unLock(); ldLockWatchBack.unLock(); },"thread-"+i).start(); } while (true){ } } }