1、Zookeeper简介
1.1、概述
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
1.2、数据结构
在Zookeeper中包括了四种节点类型:临时节点、持久节点、临时有序节点(分布式锁基于该类的数据节点实现)、持久有序节点。
1.3、作为分布式锁依据的特性
基于zookeeper的临时有序节点实现分布式锁方案,主要是因为Zookeeper具体如下特性:
- 临时有序节点在客户端与Zookeeper断开连接后,后自动删除,这样可以避免死锁。
- 临时有序节点是生成的节点序号是有序的,我们可以选择最小序号的节点作为获取到锁的判断依据
- zookeeper的节点上可以注册上用户事件,这样我们可以实现阻塞的分布式锁,即Zookeeper的观察器,可以监控节点变化。
- Zookeeper可以做高可用集群,保证可用性。
2、实现分布式锁的步骤
&esmp;大概流程如下:
- 开始时,并发线程(A、B、C……)创建临时有序节点,得到有序的节点集合
- 选取创建节点的序号最小的线程获取锁,其他失败线程,根据序号顺序,监听自己前面一个序号的节点
- 获取到锁的线程,执行业务,完成后释放锁
- 当释放锁的时候,会删除当前有序临时节点,这个时候会触发监听器,唤醒下一个线程继续执行
- 依次执行,知道所有线程执行完成。
3、基于Zookeeper实现分布式锁(排它锁)
3.1、环境搭建
首先,添加Zookeeper需要的依赖,这里需要注意:排除Zookeeper依赖中的日志依赖,否则会和SpringBoot默认的日志有冲突(当然也可以根据项目情况自行选取)。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.8</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
然后,添加zk的连接配置,并编写java类读取。application.properties配置文件中的配置:
server.port=8080
#zookeeper配置
zk.connectString=127.0.0.1:2181
zk.sessionTimeout=10000
#日志配置
logging.level.com.qriver.distributedlock=debug
读取配置的java类:
@Configuration
public class ZookeeperConfig {
@Value("${zk.connectString:127.0.0.1:2181}")
private String connectString;
@Value("${zk.sessionTimeout:10000}")
private int sessionTimeout;
public String getConnectString() {
return connectString;
}
public void setConnectString(String connectString) {
this.connectString = connectString;
}
public int getSessionTimeout() {
return sessionTimeout;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
}
再然后,编写基于Zookeeper的分布式锁,这个是核心实现,如下:
/**
* 基于Zookeeper实现分布式锁
*/
public class ZookeeperLock implements AutoCloseable, Watcher {
private ZooKeeper zk;
//创建的临时节点
private String zkNode;
public ZookeeperLock(ZookeeperConfig config) throws IOException {
this.zk = new ZooKeeper(config.getConnectString(),config.getSessionTimeout(),this);
}
public boolean getLock(String key){
try {
//首先,判断业务节点(持久节点)是否存在,不存在就创建
Stat stat = zk.exists("/" + key, false);
if(stat == null){
zk.create("/" + key,key.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
//创建临时有序节点
zkNode = zk.create("/" + key + "/" + key + "_",key.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
//排序,选取最小节点,即查看业务节点下所有节点,排序,选第一个
List<String> childrenNodes = zk.getChildren("/" + key, false);
Collections.sort(childrenNodes);
//获取序号最小的(第一个)子节点
String firstNode = childrenNodes.get(0);
//判断当前节点是否是第一个,即是否应该获取到锁
if (zkNode.endsWith(firstNode)){
return true;
}
//如果当前节点不能获取到锁,则需要添加监听器。
//lastNode,遍历过程中,保存当前节点的前一个节点,并添加监听器
String lastNode = firstNode;
for (String node : childrenNodes){
if (zkNode.endsWith(node)){
zk.exists("/" + key + "/" + lastNode,true);
break;
}else {
lastNode = node;
}
}
//锁定当前对象,
synchronized (this){
wait();
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
@Override
public void close() throws Exception {
zk.delete(zkNode,-1);
zk.close();
}
/**
* 监听器方法,监听到删除节点操作(即释放锁),唤醒下一个节点(线程)
* @param event
*/
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted){
synchronized (this){
notify();
}
}
}
}
最后,编写应用分布式锁的测试类DemoController,如下:
@RestController
public class DemoController {
private Logger logger = LoggerFactory.getLogger(DemoController.class);
@Autowired
private ZookeeperConfig zookeeperConfig;
@RequestMapping("zkLock")
public String testLock() {
logger.debug("进入testLock()方法;");
//“order"表示业务模块
try(ZookeeperLock lock = new ZookeeperLock(zookeeperConfig)){
if(lock.getLock("order")){
logger.debug("获取到锁;");
Thread.sleep(20 * 1000);
}
}catch (Exception e){
e.printStackTrace();
}
logger.debug("方法执行完成;");
return "方法执行完成";
}
}
完成了上述编码工作后,开始进行分布式锁验证,和前面类似,通过启动两个服务,分别访问,后访问的会等到前面访问的释放锁后,才能够获取到锁(即支持阻塞的锁),这里不再粘贴具体的测试结果和打印的日志。