基于Zookeeper监听和回调-分布式锁

zookeeper有本身的回调和监听机制,使用redis作分布式锁,如果分布式机器多的话,同步会造成一定的时间差,zookeeper的回调和监听,由zk主动进行业务代码调用,数据间同步迅速快捷。且结合临时节点,可以在client网络断开的时候,清除节点,避免死锁,临时节点也可以设置存活的时长。

使用zookeeper的临时、序列节点

原理:

在同一个目录下,由最早的节点获取锁,如果监听整个目录,每次释放锁都,排查文件号最小的,占用的资源会比较多。

所以这里采用的方案是,每个节点给前面的节点添加监听,当前面的节点释放锁被删除之后,则,通知后续节点。即,拿到锁的是最小的节点,当它释放锁,删除节点,会根据删除事件通知后续的一个节点。

注:如果是中间节点被手动删除了,也不影响。

 

以下撰写的分布式锁,满足可重入的要求:

//maven    lombok此处不写

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.1</version>
        </dependency>

 

//zookeeper连接工具类
public class ZkUUtiles {

    private static ZooKeeper zk;

    private static CountDownLatch cc = new CountDownLatch(1);

    public static ZooKeeper getZk(){
        LDefaultWatcher defaultWatcher = new LDefaultWatcher();
        defaultWatcher.setCc(cc);
        try {
            zk = new ZooKeeper("127.0.0.1:2181/testLock2", 3000, defaultWatcher);
            cc.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return zk;
    }
}
//默认监听
@Data
public class LDefaultWatcher implements Watcher {

    private  CountDownLatch cc;
    @Override
    public void process(WatchedEvent event) {
        switch (event.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                cc.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
            case Closed:
                break;
        }
    }
}
//分布式锁,回调机制阻塞
@Data
public class LDLockWatchBack implements AsyncCallback.StringCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback , Watcher {

    private ZooKeeper zk;

    private String threadName;

    private String nodeName;

    private CountDownLatch cc = new CountDownLatch(1);

    private Integer counInTimes = 1;

    public void tryLock(){
        try {
            //重入锁
            if (isTwiceIn()) return;


            zk.create("/lock", threadName.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this,"");
            cc.await();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private boolean isTwiceIn()  {
        byte[] data = new byte[0];
        try {
            data = zk.getData("/", false, new Stat());
        }  catch (Exception e) {
            e.printStackTrace();
        }
        if(threadName.equals(new String(data))){
            counInTimes++;
            return true;
        }
        return false;
    }

    public void unLock(){
        try {
            if(--counInTimes==0){
                zk.delete(nodeName,-1);
                zk.setData("/","no_thread_lock".getBytes(),-1);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    //StringCallback
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name !=null){
            System.out.println(threadName+" create "+name);
            nodeName = name;
            zk.getChildren("/",false, this,"dfs");
        }


    }

    @Override
    //childCallback
    public void processResult(int rc, String path, Object ctx, List<String> children) {

        //判断当前节点是不是第一个
        //如果是,则拿到锁,可以继续运行下去
        //如果不是,则给前面节点注册监听事件

        //先排序
        Collections.sort(children);
        int i = children.indexOf(nodeName.substring(1));
        if(i ==0 ){
            try {
                zk.setData("/",threadName.getBytes(),-1);
                Thread.sleep(10);
                //业务代码,肯定至少运行10ms,此处如果不sleep,会造成,第一个节点很快已经走完了,
                // 而后续节点,已经拿到了children,还没有给前节点注册上监听,但已经能判断自己不是第一个
                //这种情况,第一个节点挂了,但没有监听,所以不会进行后续节点的通知。即,第一条线程执行完后,不再走后续线程

            } catch (Exception e) {
                e.printStackTrace();
            }

            cc.countDown();
        }else{
            zk.exists("/"+children.get(i-1),this, this,"");
        }

    }

    @Override
    //StatCallback
    public void processResult(int rc, String path, Object ctx, Stat stat) {
            //待实现
    }

    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zk.getChildren("/",false, this,"dfs");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }
}
//测试代码
public class PDLockTest {

    private ZooKeeper zk;

    @Before
    public void getZK(){
        zk = ZkUUtiles.getZk();
    }

    @After
    public void closeZ(){
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void dLockTest(){

        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                LDLockWatchBack ldLockWatchBack = new LDLockWatchBack();
                ldLockWatchBack.setZk(zk);
                ldLockWatchBack.setThreadName(Thread.currentThread().getName());

                //抢锁
                ldLockWatchBack.tryLock();
                ldLockWatchBack.tryLock();
                ldLockWatchBack.tryLock();
                ldLockWatchBack.tryLock();
                ldLockWatchBack.tryLock();
                ldLockWatchBack.tryLock();
                //工作
                System.out.println(Thread.currentThread().getName()+":工作中。。。");
//                try {
//                    TimeUnit.SECONDS.sleep(1);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                //释放锁
                ldLockWatchBack.unLock();
                ldLockWatchBack.unLock();
                ldLockWatchBack.unLock();
                ldLockWatchBack.unLock();
                ldLockWatchBack.unLock();
                ldLockWatchBack.unLock();
            },"thread-"+i).start();
        }

        while (true){

        }
    }

}

 

上一篇:K8S单节点跑zk集群


下一篇:JavaGuide学习记录