DistributedBarrier
DistributedBarrier
类的源码注释:
Distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed.
分布式系统使用屏障来阻止一组节点的处理,直到满足允许所有节点继续的条件为止。
类比单体应用的屏障CyclicBarrier
:
DistributedBarrier
源码:
public class DistributedBarrier
{
// CuratorFramework实例,用于与Zookeeper进行交互
private final CuratorFramework client;
// 分布式屏障的路径
private final String barrierPath;
// 监听器
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
client.postSafeNotify(DistributedBarrier.this);
}
};
/**
* 构造方法
*/
public DistributedBarrier(CuratorFramework client, String barrierPath)
{
this.client = client;
this.barrierPath = PathUtils.validatePath(barrierPath);
}
/**
* 设置屏障(创建屏障节点)
*/
public synchronized void setBarrier() throws Exception
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(barrierPath);
}
catch ( KeeperException.NodeExistsException ignore )
{
// ignore
}
}
/**
* 移除屏障(删除屏障节点)
*/
public synchronized void removeBarrier() throws Exception
{
try
{
client.delete().forPath(barrierPath);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
}
/**
* 阻塞直到屏障不再存在
*/
public synchronized void waitOnBarrier() throws Exception
{
waitOnBarrier(-1, null);
}
/**
* 阻塞直到屏障不再存在或超时
*/
public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) throws Exception
{
long startMs = System.currentTimeMillis();
boolean hasMaxWait = (unit != null);
long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;
boolean result;
for(;;)
{
// 屏障节点是否不存在,true为不存在,false为存在
result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);
// 屏障节点不存在,直接退出
if ( result )
{
break;
}
// 屏障节点存在,进行等待
if ( hasMaxWait )
{
long elapsed = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsed;
// 等待超时,直接退出
if ( thisWaitMs <= 0 )
{
break;
}
// 继续等待
wait(thisWaitMs);
}
else
{
wait();
}
}
return result;
}
}
DistributedBarrier
的源码还是比较简单的,就是通过一个Zookeeper
节点的创建与删除来实现分布式屏障。
测试
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
CuratorFrameworkProperties
类(提供CuratorFramework
需要的一些配置信息,以及创建CuratorFramework
实例的方法):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static CuratorFramework getCuratorFramework() {
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
}
}
DistributedBarrierRunnable
类(实现了Runnable
接口,模拟分布式节点等待分布式屏障):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import java.util.Random;
public class DistributedBarrierRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随机加入的分布式节点
int randomSleep = new Random().nextInt(1000);
Thread.sleep(randomSleep);
// 分布式屏障的路径
String barrierPath = "/kaven";
// 创建DistributedBarrier实例,用于提供分布式屏障功能
DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);
System.out.println(Thread.currentThread().getName() + " 等待屏障被移除");
long start = System.currentTimeMillis();
// 等待屏障被移除
barrier.waitOnBarrier();
System.out.println(Thread.currentThread().getName() + " 等待了 "
+ (System.currentTimeMillis() - start) / 1000 + " s");
System.out.println(Thread.currentThread().getName() + " 继续执行");
}
}
启动类:
package com.kaven.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 分布式屏障的路径
String barrierPath = "/kaven";
// 创建DistributedBarrier实例,用于设置和在适当时机删除屏障
DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);
// 设置屏障
barrier.setBarrier();
// 分布式节点处理业务
for (int i = 0; i < 5; i++) {
EXECUTOR_SERVICE.execute(new DistributedBarrierRunnable());
}
// 模拟移除屏障需要处理的业务
Thread.sleep(20000);
// 移除屏障
barrier.removeBarrier();
}
}
模拟5
个分布式节点等待分布式屏障,输出如下所示:
pool-1-thread-5 等待屏障被移除
pool-1-thread-3 等待屏障被移除
pool-1-thread-1 等待屏障被移除
pool-1-thread-4 等待屏障被移除
pool-1-thread-2 等待屏障被移除
pool-1-thread-2 等待了 19 s
pool-1-thread-2 继续执行
pool-1-thread-5 等待了 19 s
pool-1-thread-5 继续执行
pool-1-thread-3 等待了 19 s
pool-1-thread-3 继续执行
pool-1-thread-4 等待了 19 s
pool-1-thread-4 继续执行
pool-1-thread-1 等待了 19 s
pool-1-thread-1 继续执行
使用提供超时机制的等待屏障方法:
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class DistributedBarrierRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随机加入的分布式节点
int randomSleep = new Random().nextInt(1000);
Thread.sleep(randomSleep);
// 分布式屏障的路径
String barrierPath = "/kaven";
// 创建DistributedBarrier实例,用于提供分布式屏障功能
DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);
System.out.println(Thread.currentThread().getName() + " 等待屏障被移除");
long start = System.currentTimeMillis();
// 等待屏障被移除
boolean result = barrier.waitOnBarrier(10, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + " 等待了 "
+ (System.currentTimeMillis() - start) / 1000 + " s");
if(result) {
System.out.println(Thread.currentThread().getName() + " 继续执行");
}
else {
// 等待屏障超时
System.out.println(Thread.currentThread().getName() + " 等待屏障超时");
}
}
}
输出如下所示:
pool-1-thread-1 等待屏障被移除
pool-1-thread-2 等待屏障被移除
pool-1-thread-3 等待屏障被移除
pool-1-thread-4 等待屏障被移除
pool-1-thread-5 等待屏障被移除
pool-1-thread-1 等待了 10 s
pool-1-thread-1 等待屏障超时
pool-1-thread-2 等待了 10 s
pool-1-thread-2 等待屏障超时
pool-1-thread-3 等待了 10 s
pool-1-thread-3 等待屏障超时
pool-1-thread-4 等待了 10 s
pool-1-thread-4 等待屏障超时
pool-1-thread-5 等待了 10 s
pool-1-thread-5 等待屏障超时
符合预期。
Curator
框架的分布式屏障DistributedBarrier
就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。