zookeeper分布式锁

首先搭建zookeeper集群docker-compose.yml

version: '2'
networks:
  zk:
services:
  zk1:
    image: zookeeper:3.4
    container_name: zk1
    networks:
        - zk
    ports:
        - "21811:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888
  zk2:
    image: zookeeper:3.4
    container_name: zk2
    networks:
        - zk
    ports:
        - "21812:2181"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zk1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zk3:2888:3888
  zk3:
    image: zookeeper:3.4
    container_name: zk3
    networks:
        - zk
    ports:
        - "21813:2181"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=0.0.0.0:2888:3888

docker-compose up -d 创建并启动
检查状态

[root@localhost test]# docker exec -it zk1 bash ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: follower
[root@localhost test]# docker exec -it zk2 bash ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: follower
[root@localhost test]# docker exec -it zk3 bash ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: leader

 

下面我们来程序验证:

我们用现成的curator来操作zk分布式锁

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Time;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class DistributedLock {


        public static Logger log = LoggerFactory.getLogger(DistributedLock.class);
        private InterProcessMutex interProcessMutex;  //可重入排它锁
        private String lockName;  //竞争资源标志
        private String root = "/distributed/lock/";//根节点
        private static CuratorFramework curatorFramework;
        private static String ZK_URL = "127.0.0.1:21811,127.0.0.1:21812,127.0.0.1:21813";
        static{
            curatorFramework= CuratorFrameworkFactory.newClient(ZK_URL,new ExponentialBackoffRetry(1000,3));
            curatorFramework.start();
        }

        /**
         * 实例化
         * @param lockName
         */
        public DistributedLock(String lockName){
            try {
                this.lockName = lockName;
                interProcessMutex = new InterProcessMutex(curatorFramework, root + lockName);
            }catch (Exception e){
                log.error("initial InterProcessMutex exception="+e);
            }
        }

        /**
         * 获取锁
         */
        public void acquireLock(){
            int flag = 0;
            try {
                //重试N次,每次最大等待1s
                while (!interProcessMutex.acquire(1, TimeUnit.SECONDS)){
                    flag++;
                    if(flag>5){  //重试次
                        break;
                    }
                }
            } catch (Exception e) {
                log.error("distributed lock acquire exception="+e);
            }
            if(flag>5){
                log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock  busy"+ new Date().getTime());
            }else{
                log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock  success"+ new Date().getTime());
            }
        }

        /**
         * 释放锁
         */
        public void releaseLock(){
            try {
                if(interProcessMutex != null && interProcessMutex.isAcquiredInThisProcess()){
                    interProcessMutex.release();
                    curatorFramework.delete().inBackground().forPath(root+lockName);
                    //byte[] data = curatorFramework.getData().forPath(root + lockName);
                    log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock  success"+ new Date().getTime());
                }
            }catch (Exception e){
                log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock  exception="+e);
            }
        }
    }

 

接下来我们开2个线程来竞争分布式锁:

public class TestLock {

    public static void main(String[] args) throws InterruptedException {
        String lockName = "lock1";
        DistributedLock lockFoo = new DistributedLock(lockName);
        //lockFoo.acquireLock();
        //lockFoo.releaseLock();
        //
        System.out.println("主线程ID是:" + Thread.currentThread().getId());
        Thread thread1 = new MyThread("thread1",lockFoo);
        Thread thread2 = new MyThread("thread2",lockFoo);
        thread1.start();
        Thread.sleep(1000);
        thread2.start();

    }


}

/**
 * 自定义线程
 */
class MyThread extends Thread {
    /*线程名称*/
    private String name;
    private DistributedLock lockFoo;

    public MyThread(String name,DistributedLock lockFoo) {
        this.name = name;
        this.lockFoo = lockFoo;
    }

    @Override
    public void run() {

        if(this.name.equals("thread1")) {
            this.lockFoo.acquireLock();
        }

        System.out.println("名称" + name + "的线程ID是:" + Thread.currentThread().getId());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(this.name.equals("thread2")) {
            this.lockFoo.releaseLock();
        }
    }
}

程序中thread1获取了锁后,thread2解锁失败。

上一篇:利用redis 分布式锁 解决集群环境下多次定时任务执行


下一篇:为什么使用中间件?如何实现Redis分布式锁以及缓存和消息队列常见问题