zookeeper分布式锁

首先搭建zookeeper集群docker-compose.yml

version: '2'networks:
  zk:
services:
  zk1:
    image: zookeeper:3.4
    container_name: zk1
    networks:        - zk
    ports:        - "21811:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888
  zk2:
    image: zookeeper:3.4
    container_name: zk2
    networks:        - zk
    ports:        - "21812:2181"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zk1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zk3:2888:3888
  zk3:
    image: zookeeper:3.4
    container_name: zk3
    networks:        - zk
    ports:        - "21813:2181"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=0.0.0.0:2888:3888

docker-compose up -d 创建并启动
检查状态

[root@localhost test]# docker exec -it zk1 bash ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: follower
[root@localhost test]# docker exec -it zk2 bash ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: follower
[root@localhost test]# docker exec -it zk3 bash ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: leader

 

下面我们来程序验证:

我们用现成的curator来操作zk分布式锁

import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.api.GetDataBuilder;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.Time;import java.util.Date;import java.util.concurrent.TimeUnit;public class DistributedLock {        public static Logger log = LoggerFactory.getLogger(DistributedLock.class);        private InterProcessMutex interProcessMutex;  //可重入排它锁
        private String lockName;  //竞争资源标志
        private String root = "/distributed/lock/";//根节点
        private static CuratorFramework curatorFramework;        private static String ZK_URL = "127.0.0.1:21811,127.0.0.1:21812,127.0.0.1:21813";        static{
            curatorFramework= CuratorFrameworkFactory.newClient(ZK_URL,new ExponentialBackoffRetry(1000,3));
            curatorFramework.start();
        }        /**
         * 实例化
         * @param lockName         */
        public DistributedLock(String lockName){            try {                this.lockName = lockName;
                interProcessMutex = new InterProcessMutex(curatorFramework, root + lockName);
            }catch (Exception e){
                log.error("initial InterProcessMutex exception="+e);
            }
        }        /**
         * 获取锁         */
        public void acquireLock(){            int flag = 0;            try {                //重试N次,每次最大等待1s
                while (!interProcessMutex.acquire(1, TimeUnit.SECONDS)){
                    flag++;                    if(flag>5){  //重试次
                        break;
                    }
                }
            } catch (Exception e) {
                log.error("distributed lock acquire exception="+e);
            }            if(flag>5){
                log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock  busy"+ new Date().getTime());
            }else{
                log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock  success"+ new Date().getTime());
            }
        }        /**
         * 释放锁         */
        public void releaseLock(){            try {                if(interProcessMutex != null && interProcessMutex.isAcquiredInThisProcess()){
                    interProcessMutex.release();
                    curatorFramework.delete().inBackground().forPath(root+lockName);                    //byte[] data = curatorFramework.getData().forPath(root + lockName);
                    log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock  success"+ new Date().getTime());
                }
            }catch (Exception e){
                log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock  exception="+e);
            }
        }
    }

 

接下来我们开2个线程来竞争分布式锁:

public class TestLock {    public static void main(String[] args) throws InterruptedException {
        String lockName = "lock1";
        DistributedLock lockFoo = new DistributedLock(lockName);        //lockFoo.acquireLock();        //lockFoo.releaseLock();        //        System.out.println("主线程ID是:" + Thread.currentThread().getId());
        Thread thread1 = new MyThread("thread1",lockFoo);
        Thread thread2 = new MyThread("thread2",lockFoo);
        thread1.start();
        Thread.sleep(1000);
        thread2.start();

    }


}/**
 * 自定义线程 */class MyThread extends Thread {    /*线程名称*/
    private String name;    private DistributedLock lockFoo;    public MyThread(String name,DistributedLock lockFoo) {        this.name = name;        this.lockFoo = lockFoo;
    }

    @Override    public void run() {        if(this.name.equals("thread1")) {            this.lockFoo.acquireLock();
        }

        System.out.println("名称" + name + "的线程ID是:" + Thread.currentThread().getId());        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        if(this.name.equals("thread2")) {            this.lockFoo.releaseLock();
        }
    }
}

程序中thread1获取了锁后,thread2解锁失败。

上一篇:学习笔记|Generator 函数的异步应用


下一篇:使用Powershell脚本实现微信多开