基于Zookeeper实现的分布式锁

1、Zookeeper简介

1.1、概述

  ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

  ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

1.2、数据结构

  在Zookeeper中包括了四种节点类型:临时节点、持久节点、临时有序节点(分布式锁基于该类的数据节点实现)、持久有序节点。

1.3、作为分布式锁依据的特性

  基于zookeeper的临时有序节点实现分布式锁方案,主要是因为Zookeeper具体如下特性:

  1. 临时有序节点在客户端与Zookeeper断开连接后,后自动删除,这样可以避免死锁。
  2. 临时有序节点是生成的节点序号是有序的,我们可以选择最小序号的节点作为获取到锁的判断依据
  3. zookeeper的节点上可以注册上用户事件,这样我们可以实现阻塞的分布式锁,即Zookeeper的观察器,可以监控节点变化。
  4. Zookeeper可以做高可用集群,保证可用性。

2、实现分布式锁的步骤

基于Zookeeper实现的分布式锁
 &esmp;大概流程如下:

  1. 开始时,并发线程(A、B、C……)创建临时有序节点,得到有序的节点集合
  2. 选取创建节点的序号最小的线程获取锁,其他失败线程,根据序号顺序,监听自己前面一个序号的节点
  3. 获取到锁的线程,执行业务,完成后释放锁
  4. 当释放锁的时候,会删除当前有序临时节点,这个时候会触发监听器,唤醒下一个线程继续执行
  5. 依次执行,知道所有线程执行完成。

3、基于Zookeeper实现分布式锁(排它锁)

3.1、环境搭建

  首先,添加Zookeeper需要的依赖,这里需要注意:排除Zookeeper依赖中的日志依赖,否则会和SpringBoot默认的日志有冲突(当然也可以根据项目情况自行选取)。

<dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.5.8</version>
     <exclusions>
         <exclusion>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
         </exclusion>
     </exclusions>
 </dependency>

  然后,添加zk的连接配置,并编写java类读取。application.properties配置文件中的配置:

server.port=8080

#zookeeper配置
zk.connectString=127.0.0.1:2181
zk.sessionTimeout=10000

#日志配置
logging.level.com.qriver.distributedlock=debug

  读取配置的java类:

@Configuration
public class ZookeeperConfig {

    @Value("${zk.connectString:127.0.0.1:2181}")
    private String connectString;
    @Value("${zk.sessionTimeout:10000}")
    private int sessionTimeout;

    public String getConnectString() {
        return connectString;
    }

    public void setConnectString(String connectString) {
        this.connectString = connectString;
    }

    public int getSessionTimeout() {
        return sessionTimeout;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }
}

  再然后,编写基于Zookeeper的分布式锁,这个是核心实现,如下:

/**
 * 基于Zookeeper实现分布式锁
 */
public class ZookeeperLock implements AutoCloseable, Watcher {


    private ZooKeeper zk;

    //创建的临时节点
    private String zkNode;

    public ZookeeperLock(ZookeeperConfig config) throws IOException {
        this.zk = new ZooKeeper(config.getConnectString(),config.getSessionTimeout(),this);
    }
    public boolean getLock(String key){

        try {
            //首先,判断业务节点(持久节点)是否存在,不存在就创建
           Stat stat =  zk.exists("/" + key, false);
           if(stat == null){
                zk.create("/" + key,key.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
           }
            //创建临时有序节点
            zkNode = zk.create("/" + key + "/" + key + "_",key.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            //排序,选取最小节点,即查看业务节点下所有节点,排序,选第一个
            List<String> childrenNodes = zk.getChildren("/" + key, false);
            Collections.sort(childrenNodes);
            //获取序号最小的(第一个)子节点
            String firstNode = childrenNodes.get(0);
            //判断当前节点是否是第一个,即是否应该获取到锁
            if (zkNode.endsWith(firstNode)){
                return true;
            }
            //如果当前节点不能获取到锁,则需要添加监听器。
            //lastNode,遍历过程中,保存当前节点的前一个节点,并添加监听器
            String lastNode = firstNode;
            for (String node : childrenNodes){
                if (zkNode.endsWith(node)){
                    zk.exists("/" + key + "/" + lastNode,true);
                    break;
                }else {
                    lastNode = node;
                }
            }
            //锁定当前对象,
            synchronized (this){
                wait();
            }
            return true;
        }  catch (Exception e) {
            e.printStackTrace();
        }
        return true;
    }
    @Override
    public void close() throws Exception {
        zk.delete(zkNode,-1);
        zk.close();
    }

    /**
     * 监听器方法,监听到删除节点操作(即释放锁),唤醒下一个节点(线程)
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDeleted){
            synchronized (this){
                notify();
            }
        }
    }

}

  最后,编写应用分布式锁的测试类DemoController,如下:

@RestController
public class DemoController {

    private Logger logger = LoggerFactory.getLogger(DemoController.class);

    @Autowired
    private ZookeeperConfig zookeeperConfig;

    @RequestMapping("zkLock")
    public String testLock() {
        logger.debug("进入testLock()方法;");
        //“order"表示业务模块
        try(ZookeeperLock lock = new ZookeeperLock(zookeeperConfig)){
            if(lock.getLock("order")){
                logger.debug("获取到锁;");
                Thread.sleep(20 * 1000);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        logger.debug("方法执行完成;");
        return "方法执行完成";
    }

}

  完成了上述编码工作后,开始进行分布式锁验证,和前面类似,通过启动两个服务,分别访问,后访问的会等到前面访问的释放锁后,才能够获取到锁(即支持阻塞的锁),这里不再粘贴具体的测试结果和打印的日志。

上一篇:一个架构师的自白 | 那些追源码的平凡之路


下一篇:zookeeper学习相关基础知识