ZooKeeper : Curator框架之分布式屏障DistributedBarrier

DistributedBarrier

DistributedBarrier类的源码注释:

Distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed.

分布式系统使用屏障来阻止一组节点的处理,直到满足允许所有节点继续的条件为止。

类比单体应用的屏障CyclicBarrier

DistributedBarrier源码:

public class DistributedBarrier
{
    // CuratorFramework实例,用于与Zookeeper进行交互
    private final CuratorFramework client;
    // 分布式屏障的路径
    private final String barrierPath;
    // 监听器
    private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            client.postSafeNotify(DistributedBarrier.this);
        }
    };

    /**
     * 构造方法
     */
    public DistributedBarrier(CuratorFramework client, String barrierPath)
    {
        this.client = client;
        this.barrierPath = PathUtils.validatePath(barrierPath);
    }

    /**
     * 设置屏障(创建屏障节点)
     */
    public synchronized void         setBarrier() throws Exception
    {
        try
        {
            client.create().creatingParentContainersIfNeeded().forPath(barrierPath);
        }
        catch ( KeeperException.NodeExistsException ignore )
        {
            // ignore
        }
    }

    /**
     * 移除屏障(删除屏障节点)
     */
    public synchronized void      removeBarrier() throws Exception
    {
        try
        {
            client.delete().forPath(barrierPath);
        }
        catch ( KeeperException.NoNodeException ignore )
        {
            // ignore
        }
    }

    /**
     * 阻塞直到屏障不再存在
     */
    public synchronized void      waitOnBarrier() throws Exception
    {
        waitOnBarrier(-1, null);
    }

    /**
     * 阻塞直到屏障不再存在或超时
     */
    public synchronized boolean      waitOnBarrier(long maxWait, TimeUnit unit) throws Exception
    {
        long            startMs = System.currentTimeMillis();
        boolean         hasMaxWait = (unit != null);
        long            maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;

        boolean         result;
        for(;;)
        {
            // 屏障节点是否不存在,true为不存在,false为存在
            result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);
            // 屏障节点不存在,直接退出
            if ( result )
            {
                break;
            }
            // 屏障节点存在,进行等待
            if ( hasMaxWait )
            {
                long        elapsed = System.currentTimeMillis() - startMs;
                long        thisWaitMs = maxWaitMs - elapsed;
                // 等待超时,直接退出
                if ( thisWaitMs <= 0 )
                {
                    break;
                }
                // 继续等待
                wait(thisWaitMs);
            }
            else
            {
                wait();
            }
        }
        return result;
    }
}

DistributedBarrier的源码还是比较简单的,就是通过一个Zookeeper节点的创建与删除来实现分布式屏障。

测试

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kaven</groupId>
    <artifactId>zookeeper</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>

CuratorFrameworkProperties类(提供CuratorFramework需要的一些配置信息,以及创建CuratorFramework实例的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorFrameworkProperties {
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static CuratorFramework getCuratorFramework() {
        // 创建CuratorFramework实例
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorFrameworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);
        return curator;
    }
}

DistributedBarrierRunnable类(实现了Runnable接口,模拟分布式节点等待分布式屏障):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;

import java.util.Random;

public class DistributedBarrierRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework实例,表示不同的分布式节点
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随机加入的分布式节点
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);
        
        // 分布式屏障的路径
        String barrierPath = "/kaven";

        // 创建DistributedBarrier实例,用于提供分布式屏障功能
        DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);

        System.out.println(Thread.currentThread().getName() + " 等待屏障被移除");
        long start = System.currentTimeMillis();
        // 等待屏障被移除
        barrier.waitOnBarrier();
        System.out.println(Thread.currentThread().getName() + " 等待了 "
                + (System.currentTimeMillis() - start) / 1000 + " s");
        System.out.println(Thread.currentThread().getName() + " 继续执行");
    }
}

启动类:

package com.kaven.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 创建CuratorFramework实例
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 分布式屏障的路径
        String barrierPath = "/kaven";

        // 创建DistributedBarrier实例,用于设置和在适当时机删除屏障
        DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);

        // 设置屏障
        barrier.setBarrier();

        // 分布式节点处理业务
        for (int i = 0; i < 5; i++) {
            EXECUTOR_SERVICE.execute(new DistributedBarrierRunnable());
        }

        // 模拟移除屏障需要处理的业务
        Thread.sleep(20000);

        // 移除屏障
        barrier.removeBarrier();
    }
}

模拟5个分布式节点等待分布式屏障,输出如下所示:

pool-1-thread-5 等待屏障被移除
pool-1-thread-3 等待屏障被移除
pool-1-thread-1 等待屏障被移除
pool-1-thread-4 等待屏障被移除
pool-1-thread-2 等待屏障被移除
pool-1-thread-2 等待了 19 s
pool-1-thread-2 继续执行
pool-1-thread-5 等待了 19 s
pool-1-thread-5 继续执行
pool-1-thread-3 等待了 19 s
pool-1-thread-3 继续执行
pool-1-thread-4 等待了 19 s
pool-1-thread-4 继续执行
pool-1-thread-1 等待了 19 s
pool-1-thread-1 继续执行

使用提供超时机制的等待屏障方法:

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class DistributedBarrierRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework实例,表示不同的分布式节点
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随机加入的分布式节点
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);

        // 分布式屏障的路径
        String barrierPath = "/kaven";

        // 创建DistributedBarrier实例,用于提供分布式屏障功能
        DistributedBarrier barrier = new DistributedBarrier(curator, barrierPath);

        System.out.println(Thread.currentThread().getName() + " 等待屏障被移除");
        long start = System.currentTimeMillis();
        
        // 等待屏障被移除
        boolean result = barrier.waitOnBarrier(10, TimeUnit.SECONDS);
        
        System.out.println(Thread.currentThread().getName() + " 等待了 "
                + (System.currentTimeMillis() - start) / 1000 + " s");
        if(result) {
            System.out.println(Thread.currentThread().getName() + " 继续执行");
        }
        else {
            // 等待屏障超时
            System.out.println(Thread.currentThread().getName() + " 等待屏障超时");
        }
    }
}

输出如下所示:

pool-1-thread-1 等待屏障被移除
pool-1-thread-2 等待屏障被移除
pool-1-thread-3 等待屏障被移除
pool-1-thread-4 等待屏障被移除
pool-1-thread-5 等待屏障被移除
pool-1-thread-1 等待了 10 s
pool-1-thread-1 等待屏障超时
pool-1-thread-2 等待了 10 s
pool-1-thread-2 等待屏障超时
pool-1-thread-3 等待了 10 s
pool-1-thread-3 等待屏障超时
pool-1-thread-4 等待了 10 s
pool-1-thread-4 等待屏障超时
pool-1-thread-5 等待了 10 s
pool-1-thread-5 等待屏障超时

符合预期。

Curator框架的分布式屏障DistributedBarrier就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

上一篇:Android修炼系列:一个由 AsyncTask 引起的线上问题


下一篇:JUC之线程池基础与简单源码分析