zookeeper 实现分布式锁

目录

背景

1. zk 为什么可以用作分布式锁?

2. zk 如何保证并发场景下顺序节点不会重复? 

Zookeeper 分布式锁实现方式有哪些?

Curator + spring boot 实现分布式锁

maven 依赖

配置

实现 

Zookeeper+Curator实现分布式锁原理

源码实现:

Curator 和zk 原生分布式锁相比解决了哪些问题?

参考


背景

思考两个问题:

1. zk 为什么可以用作分布式锁?

Zookeeper 可以实现分布式锁,主要取决于Zookeeper 的节点是一个天然的顺序发号器,在每一个节点下创建的临时顺序节点类型,在节点下生成的新的子节点,且子节点命名会生成一个次序编号,这个次序编号是上一个次序编号加1;

Zookeeper 所创建的临时节点,当和客户端进行断开连接时,就好删除这个临时节点,这个属性可以帮我们完美避开死锁的问题

2. zk 如何保证并发场景下顺序节点不会重复? 

 通过zk 创建子节点源码方法(addChild)可以看到,其采用了synchronized 关键字修饰,保证创建子节点的方法是同步方法;在集群的环境下,leader 只有一台,fllower 接收到增、删、改 请求后,会将请求转发到leader服务 节点上统一执行,结合synchronized 就能保证节点创建不会重复;

/**
 * Method that inserts a child into the children set
 * 
 * @param child
 *            to be inserted
 * @return true if this set did not already contain the specified element
 */
public synchronized boolean addChild(String child) {
    if (children == null) {
        // let's be conservative on the typical number of children
        children = new HashSet<String>(8);
    }
    return children.add(child);
}

Zookeeper 分布式锁实现方式有哪些?

方案一: 采用zookeeper 原生实现

存在问题:1. 没有解决锁重入问题

                  2. 缺少网络连接失败重试处理

                 3. 没有解决羊群效应问题

方案二: 采用Curator 开源框架实现zk 分布式锁

Curator 

Curator + spring boot 实现分布式锁

环境: spring boot 2.3.2 release + zookeeper 3.6.0 + jdk1.8

maven 依赖

 <!--zk 锁-->
        <!--curator-->
        <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.8.0</version>
        </dependency>

        <!--zookeeper-->
        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>

配置

curator:
  # 重试次数
  retryCount: 5
  # 重试时间间隔
  elapsedTimeMs: 5000
  # zk 地址
  connectString: 127.0.0.1:2181
  # session 超时时间,客户端断开后,60s 服务端删除临时节点
  sessionTimeoutMs: 60000
  # 连接超时
  connectionTimeoutMs: 5000

实现 

package com.springcloud.alibaba.lock;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
* 读取配置类
 * @Author corn
 * @Date 2021/3/9 23:36
 */
@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class CuratorConfig {
    private int retryCount;

    private int elapsedTimeMs;

    private String connectString;

    private int sessionTimeoutMs;

    private int connectionTimeoutMs;
}
package com.springcloud.alibaba.lock;

import lombok.RequiredArgsConstructor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 初始化CuratorFramework
 * @Author corn
 * @Date 2021/3/9 23:33
 */
@Service
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class CuratorInit {


    private final CuratorConfig config;


    public CuratorFramework curatorInit(CuratorFramework curatorFramework){
        RetryOneTime retryOneTime = new RetryOneTime(config.getRetryCount());
        curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(config.getConnectString())
                .sessionTimeoutMs(config.getSessionTimeoutMs())
                .connectionTimeoutMs(config.getConnectionTimeoutMs())
                .retryPolicy(retryOneTime)
                .build();
        curatorFramework.start();
        return curatorFramework;
    }
}
/**
     *@描述 测试类
     *@参数 
     *@返回值 
     *@创建人  corn
     *@创建时间  2021/3/13
     */
    @Test
    public void zkLockTest() throws Exception {
        curatorFramework = curatorInit.curatorInit(curatorFramework);
        String path = "/produceApple";
        // 获取锁对象
        InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, path);
        // 获取锁
        interProcessMutex.acquire();
        Thread.sleep(100000);
        // 释放锁
        interProcessMutex.release();
    }

 

Zookeeper+Curator实现分布式锁原理

配合流程图了解Curator 实现分布式锁原理:

zookeeper 实现分布式锁

步骤一: 客户端A 请求,根据当前线程id 判断当前线程是否已加锁,如果是,直接在可重入计数器+1 ;返回加锁成功,否则在my_lock 节点下创建一个临时节点,节点后缀为“1”

步骤二:客户端A 获取my_lock 节点下的所有临时节点进行正序排序,判断客户端A 创建的临时节点是否处于第一个。如果是则加锁成功,并初始化可重入计数器为1

步骤三: 客户端A创建的临时节点如果不是第一个,加锁失败,并对客户端A 生成的的临时节点的上一个临时节点添加一个监视器,监控上一个节点的变化

步骤五:上一临时节点删除后zk 通知负责监听这个节点的监听器,上一个临时节点已删除,此时客户端A 可以尝试获取锁了,并会在从新执行步骤二和步骤三的流程。

源码实现:

参考博客:Apache Curator之InterProcessMutex源码分析

 

Curator 和zk 原生分布式锁相比解决了哪些问题?

用Zk 原生分布式锁,会带来一个问题就是羊群效应,因为zk 原生分布式锁实现时,是所有服务都会去监听一个节点,当一个节点释放时,这些服务得到就会一起来抢占资源,当服务很大时,会导致网络带宽,服务资源不可用情况;

而采用Curator 的方式实现的分布式锁只是对上一个节点进行添加监听器,节点释放时,也只会通知一个,从而不会导致羊群效应,并且也能实现公平锁;

 

参考

石衫的架构

笔记敖丙博客

zookeeper 官网文档

Apache Curator之InterProcessMutex源码分析

上一篇:Zookeeper学习


下一篇:【大数据Zookeeper系列】 Zookeeper Java 客户端 ——Apache Curator