架构面试-分布式存储系统HA高可用原理及应用案例实战

文章目录

  • CountDownLatch
    • CountDownLatch:同步等待多个线程完成任务的并发组件
      • CountDownLatch:同步等待多个线程完成任务的并发组件
        • 主要特点:
        • 常用方法:
        • 使用示例:
        • 总结
    • CountDownLatch源码剖析之如何基于AQS实现同步阻塞等待
      • CountDownLatch源码剖析:基于AQS实现同步阻塞等待
        • AQS简介
        • CountDownLatch结构
        • 核心源码分析
        • 总结
  • CyclicBarrier
    • CyclicBarrier:将工作任务给多线程分而治之的并发组件
      • CyclicBarrier:多线程协作的并发组件
        • 主要特性
        • 使用场景
        • 基本使用
        • 总结
    • CyclicBarrier源码剖析 如何基于AQS实现任务分而治之
      • CyclicBarrier源码剖析:基于AQS实现任务分而治之
        • CyclicBarrier 的结构
        • AQS 在 CyclicBarrier 中的角色
        • CyclicBarrier 的关键方法
        • 核心源码解析
        • 总结
  • Semaphore
    • Semaphore 等待指定数量的线程完成任务的并发组件
      • Semaphore 的基本概念
      • 使用 Semaphore 等待线程完成任务
      • 示例代码
      • 总结
    • 源码剖析之如何基于AQS等待指定数量的线程
      • Semaphore源码剖析:基于AQS的线程等待机制
        • Semaphore的基本原理
        • Semaphore的AQS实现
        • Semaphore的构造方法
        • Semaphore的关键方法
        • AQS的作用
        • 总结
    • Exchange如何支持两个线程之间进行数据交换
      • Exchanger的使用
      • Exchanger的高级用法
      • 总结
  • 案例实战
    • CyclicBarrier如何实现API服务中对多个接口并发调用后统一合并数据再返回
    • 分布式存储系统的HA高可用架构原理介绍
      • 1. 数据冗余与复制
      • 2. 分布式一致性协议
      • 3. 故障检测与恢复
      • 4. 负载均衡与资源调度
      • 5. 弹性伸缩
      • 结论
    • slave节点向主备两个master节点注册的机制介绍
      • 1. 注册流程概述
      • 2. 故障切换
      • 3. 数据同步
      • 4. 优化和挑战
    • slave节点注册时同步阻塞等待多个master注册完毕
      • 1. 使用`CountDownLatch`或类似机制
      • 2. 使用`CompletableFuture`
      • 3. 利用数据库或中间件的事务机制
      • 4. 分布式协调服务
      • 注意事项
    • 数据分布式存储场景下的分布式计算架构介绍
      • 1. 分布式存储系统
      • 2. 数据分片与分区
      • 3. 分布式计算框架
      • 4. 任务调度与资源管理
      • 5. 容错机制
      • 6. 数据一致性与事务
      • 7. 安全性与隐私
    • 基于Semaphore实现分布式计算系统的推测执行机制
      • 步骤 1: 设计Semaphore控制器
      • 步骤 2: 实现任务执行和监控
      • 步骤 3: 集成到分布式计算框架
      • 注意事项

CountDownLatch

CountDownLatch:同步等待多个线程完成任务的并发组件

CountDownLatch:同步等待多个线程完成任务的并发组件

CountDownLatch 是 Java 并发库中提供的一种非常有用的工具类,用于使一个或多个线程等待其他线程完成一组操作。它通过一个计数器来实现这一功能,初始计数器被设置为一个特定的值,每当一个线程完成自己的任务后,就将计数器减一,直至计数器到达零,所有等待的线程将被释放,继续执行后续的操作。

主要特点:
  1. 一次性CountDownLatch 只能使用一次,一旦计数器到达零,就不能再次使用。如果需要多次等待,需要创建新的实例。
  2. 不可重置:计数器一旦递减,不能重新设置。这意味着如果需要重复使用,必须创建新的 CountDownLatch 实例。
  3. 非公平性CountDownLatch 不保证等待线程的释放顺序,当计数器到达零时,所有等待线程都会被同时唤醒。
常用方法:
  • CountDownLatch(int count):构造函数,初始化计数器的值。
  • await():使当前线程等待,直到计数器到达零,或者当前线程被中断。
  • await(long timeout, TimeUnit unit):使当前线程等待,直到计数器到达零,或者等待时间超过指定的超时时间,或者当前线程被中断。
  • countDown():将计数器减一,表示一个参与者已完成任务。
使用示例:

假设我们有一个主程序,需要等待一组子线程完成各自的任务后,才能继续执行后续操作。可以使用 CountDownLatch 来实现这一功能。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        // 创建 CountDownLatch 实例,计数器初始值设为 3,意味着有 3 个子线程需要完成任务
        CountDownLatch latch = new CountDownLatch(3);

        // 启动 3 个子线程
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                // 模拟子线程执行耗时操作
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("子线程完成任务");
                // 子线程完成任务后,调用 countDown 减少计数器的值
                latch.countDown();
            }).start();
        }

        // 主线程等待所有子线程完成
        latch.await();

        System.out.println("所有子线程已完成任务,主线程继续执行...");
    }
}

在这个示例中,主线程会调用 latch.await() 方法等待,直到所有的子线程都调用了 latch.countDown() 方法,将计数器从初始的 3 减至 0,此时主线程才会继续执行后续的代码。

总结

CountDownLatch 是 Java 并发编程中一个非常实用的工具,它可以帮助我们轻松地实现线程间的同步等待,特别适用于需要等待一组操作全部完成的场景。通过合理运用 CountDownLatch,可以有效地提高多线程程序的健壮性和效率。

CountDownLatch源码剖析之如何基于AQS实现同步阻塞等待

CountDownLatch源码剖析:基于AQS实现同步阻塞等待

CountDownLatch 是Java并发库中的一个重要组件,它主要用于同步多个线程的执行流程,直到所有参与的线程完成特定操作。CountDownLatch 的核心是基于 AbstractQueuedSynchronizer (AQS) 实现的,AQS 是Java并发包中一个抽象框架,用于构建各种同步组件,如 Semaphore, ReentrantLock 等。

AQS简介

AQS 定义了一套多线程访问共享资源的框架,主要包含两部分:同步器状态和等待队列。AQS 中的同步器状态是一个整型的 volatile 变量,用于表示资源的独占状态。等待队列是一个 FIFO 的线程队列,当线程尝试获取资源失败时,会被插入到等待队列中,并阻塞等待。

CountDownLatch结构

CountDownLatch 内部维护了一个 Sync 类,继承自 AQS,用于管理同步状态。CountDownLatch 的核心在于计数器 count 的管理,当 count 降为零时,所有等待的线程将被释放。

核心源码分析
  1. Sync 类定义

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981921886391L;
        
        Sync(int count) {
            setState(count);
        }
        
        int getCount() {
            return getState();
        }
        
        protected boolean tryAcquireShared(int acquires) {
            return (getState() == 0);
        }
        
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    
  2. 构造函数

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    

    构造函数初始化 Sync 对象,设置同步状态为 count

  3. await 方法

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    

    await 方法调用 AQSacquireSharedInterruptibly 方法,尝试获取共享模式下的资源。如果当前 count 大于零,线程将被阻塞。

  4. countDown 方法

    public void countDown() {
        sync.releaseShared(1);
    }
    

    countDown 方法调用 AQSreleaseShared 方法,尝试释放共享模式下的资源,即减少 count 的值。如果 count 降为零,所有等待的线程将被唤醒。

总结

CountDownLatch 通过 AQS 的共享模式实现线程的同步等待。当计数器 count 的值大于零时,任何调用 await 方法的线程都会被阻塞,直到 count 降为零,所有等待的线程才被释放。countDown 方法负责减少 count 的值,当最后一个线程调用 countDown 使 count 降为零时,所有等待的线程将被唤醒,继续执行后续操作。

这种实现方式充分利用了 AQS 的同步机制,提供了简单而强大的线程同步功能。

CyclicBarrier

CyclicBarrier:将工作任务给多线程分而治之的并发组件

CyclicBarrier:多线程协作的并发组件

CyclicBarrier 是 Java 并发工具包 (java.util.concurrent) 中的一个类,用于帮助多个线程在执行过程中同步。它特别适合于“分而治之”的场景,即一个大任务被分解成若干个小任务,分别由多个线程并行处理,当所有小任务完成后,再集中处理这些结果,或者执行下一个阶段的任务。

主要特性
  1. 固定参与者数量:创建 CyclicBarrier 时,需要指定一个固定数量的参与者。当所有参与者都到达了屏障点,所有线程才会被释放,继续执行后续任务。

  2. 可重用性:与 CountDownLatch 不同,CyclicBarrier 在所有参与者通过后可以被重用,即它可以循环使用,直到程序显式地关闭它。

  3. 屏障动作:在创建 CyclicBarrier 时,可以传入一个 Runnable 接口的实例,称为“屏障动作”。当所有参与者都到达屏障时,会先执行这个动作,然后再释放所有线程。

  4. 异常处理:如果在 await 方法中任何一个参与者抛出了异常,那么所有等待的参与者都将被中断,屏障将重置。

使用场景

CyclicBarrier 适用于以下几种场景:

  • 并行计算:多个线程并行处理数据的不同部分,当所有线程完成时,集中处理结果。
  • 多阶段任务:任务分为多个阶段,每个阶段由多个线程并行执行,每个阶段结束后,所有线程在屏障处等待,直到所有线程到达,再一起进入下一个阶段。
  • 数据收集:多个线程收集数据,然后在所有数据收集完毕后,进行统一处理。
基本使用

以下是使用 CyclicBarrier 的一个基本示例:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {

    public static void main(String[] args) {
        int numberOfThreads = 3;
        CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
            System.out.println("所有线程都已经到达屏障,现在执行下一步...");
        });

        for (int i = 0; i < numberOfThreads; i++) {
            new Thread(() -> {
                System.out.println("线程 " + Thread.currentThread().getName() + " 正在执行...");
                try {
                    // 模拟一些耗时操作
                    Thread.sleep(1000);
                    barrier.await(); // 等待所有线程到达屏障
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("线程 " + Thread.currentThread().getName() + " 已经通过屏障,继续执行...");
            }).start();
        }
    }
}

在这个示例中,我们创建了一个 CyclicBarrier,并指定了有三个线程需要参与。当所有三个线程都到达屏障点时,会执行一个屏障动作,输出一条信息,然后所有线程被释放,继续执行后续操作。

总结

CyclicBarrier 是 Java 并发编程中的一个强大工具,它允许我们设计出高效、优雅的多线程协作模式,特别是在需要多个线程同步执行某些操作的场景下。通过合理使用 CyclicBarrier,可以大大简化多线程编程的复杂度,提高程序的效率和可维护性。

CyclicBarrier源码剖析 如何基于AQS实现任务分而治之

CyclicBarrier源码剖析:基于AQS实现任务分而治之

CyclicBarrier 是 Java 并发库中一个用于多线程协作的重要工具,它允许一组线程相互等待,直到到达某个公共屏障点。CyclicBarrier 的实现基于 AbstractQueuedSynchronizer (AQS),这是 Java 并发框架的核心组件之一,用于构建各种同步工具。

CyclicBarrier 的结构

CyclicBarrier 的核心是由 Sync 类实现的,这是一个内部类,继承自 AbstractQueuedSynchronizerSync 类的主要作用是维护一个状态变量,用于控制线程的等待和释放。

AQS 在 CyclicBarrier 中的角色

AQS 为 CyclicBarrier 提供了以下关键功能:

  1. 状态管理:AQS 通过一个 volatile int state 变量来管理同步状态。在 CyclicBarrier 中,这个状态被用来表示到达屏障的线程数量和当前的屏障生成代数。

  2. 线程等待队列:AQS 维护了一个 FIFO 等待队列,用于存放因未达到屏障条件而被阻塞的线程。

  3. 线程唤醒机制:当所有线程都到达屏障时,AQS 能够唤醒所有等待中的线程,使其继续执行。

CyclicBarrier 的关键方法
  1. CyclicBarrier(int parties, Runnable barrierAction):构造函数,初始化 parties 参数表示参与的线程数量,barrierAction 是所有线程到达屏障后执行的回调动作。

  2. await():线程调用此方法等待其他线程到达屏障。如果所有线程都到达了,那么所有线程都会被释放,并且如果设置了 barrierAction,则会执行这个动作。

核心源码解析
  1. Sync 类的实现

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981921886391L;
        private final int parties; // 参与线程数
        private transient int generation = 0; // 当前屏障的代数
        private transient int count; // 到达屏障的线程数
    
        Sync(int parties, Runnable barrierCommand) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierCommand;
        }
    
        int getGeneration() {
            return generation;
        }
    
        protected int tryAcquireShared(int arg) {
            return (isBroken() ||
                    compareAndSetState(0, 1)) ? 1 : -1;
        }
    
        protected boolean tryReleaseShared(int arg) {
            if (isBroken())
                return false;
            int c = --count;
            if (c > 0)
                return true;
            boolean broken = false;
            do {
                int g = generation;
                if (c == 0) { // 所有线程到达屏障
                    // 执行屏障动作
                    barrierCommand.run();
                    ++generation;
                    firstThread = null;
                    count = parties;
                }
                else if (g != generation) {
                    // 有线程破坏了屏障
                    broken = true;
                    break;
                }
            } while (!compareAndSetState(1, 0));
            if (broken)
                breakBarrier();
            return true;
        }
    
        // ... 其他方法 ...
    }
    
  2. await() 方法的实现

    await() 方法最终会调用 AQSacquireSharedInterruptibly() 方法,尝试获取共享模式下的资源。如果当前线程是最后一个到达屏障的线程,那么它会负责执行 barrierAction,并且重置 countgeneration

总结

CyclicBarrier 通过 AQS 的共享同步模式实现了线程的等待和释放机制。线程在调用 await() 方法时,会尝试获取共享资源,如果所有线程都到达了屏障,那么最后到达的线程会负责唤醒所有等待中的线程,并执行屏障动作。这种机制使得 CyclicBarrier 成为一种高效且灵活的多线程协作工具。

Semaphore

Semaphore 等待指定数量的线程完成任务的并发组件

Semaphore 在 Java 并发编程中主要用于控制对共享资源的访问,通过信号量机制来管理一定数量的许可。虽然 Semaphore 的主要设计目的是用于资源的限流,但它也可以被巧妙地利用来等待指定数量的线程完成任务。这种方式不同于 CountDownLatchCyclicBarrier 的直接等待机制,而是通过许可的发放和回收来间接控制线程的执行。

Semaphore 的基本概念

Semaphore 维护了一系列的许可,线程可以获取这些许可来访问一个共享资源。如果许可的数量为零,则线程将被阻塞,直到其他线程释放许可。Semaphore 提供了 acquire()release() 方法来获取和释放许可。

使用 Semaphore 等待线程完成任务

虽然 Semaphore 的主要用途不是等待线程完成任务,但我们可以通过以下方式将其用于此目的:

  1. 初始化 Semaphore: 创建一个 Semaphore 对象,其初始许可数等于需要等待的线程数量。
  2. 线程获取许可: 每个线程在开始执行任务前调用 acquire() 方法获取一个许可。如果许可数为零,线程将被阻塞,直到有许可可用。
  3. 线程释放许可: 当线程完成任务后,它应该调用 release() 方法释放一个许可。这会增加可用许可的数量,允许其他被阻塞的线程继续执行。

示例代码

下面是一个使用 Semaphore 来等待指定数量的线程完成任务的示例:

import java.util.concurrent.Semaphore;

public class SemaphoreWaitExample {
    public static void main(String[] args) {
        final int threadCount = 5; // 需要等待的线程数量
        Semaphore semaphore = new Semaphore(threadCount);

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 线程获取许可
                    System.out.println(Thread.currentThread().getName() + " is processing...");
                    Thread.sleep(1000); // 模拟任务执行
                    System.out.println(Thread.currentThread().getName() + " has finished.");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 线程释放许可
                }
            }).start();
        }

        try {
            semaphore.acquire(threadCount); // 主线程等待所有线程释放许可
            System.out.println("All threads have finished their tasks.");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

在这个例子中,Semaphore 被初始化为5个许可,代表需要等待的线程数量。每个线程在开始执行任务前获取一个许可,完成任务后释放许可。主线程通过再次调用 acquire() 方法并传入 threadCount 来等待所有线程释放许可,从而实现等待指定数量的线程完成任务的功能。

总结

虽然使用 Semaphore 来等待线程完成任务不如 CountDownLatchCyclicBarrier 直观,但在某些特定场景下,尤其是需要同时限制线程数量和等待线程完成的情况下,Semaphore 提供了一个灵活的解决方案。通过合理利用许可的获取和释放,我们可以实现对线程执行的精确控制。

源码剖析之如何基于AQS等待指定数量的线程

Semaphore源码剖析:基于AQS的线程等待机制

Semaphore是Java并发工具包中的一个类,用于控制对共享资源的访问次数,即限制同时访问的线程数量。其内部实现基于AbstractQueuedSynchronizer(AQS),AQS是一个用于构建锁和同步器的框架。下面我们将深入分析Semaphore是如何基于AQS实现等待指定数量的线程完成任务的。

Semaphore的基本原理

Semaphore维护一个整型的同步状态,代表可用的许可证数量。当线程尝试获取许可证时,如果当前状态大于0,线程可以直接获取并减少状态值;否则,线程将被加入到AQS的等待队列中,等待其他线程释放许可证。

Semaphore的AQS实现

Semaphore内部实现了一个Sync类,该类继承自AbstractQueuedSynchronizer,并通过构造函数初始化同步状态。

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 7373984972572414691L;

    Sync(int permits) {
        setState(permits);
    }

    int tryAcquireShared(int reduceCount) {
        for (;;) {
            int current = getState();
            int newCount = current - reduceCount;
            if (newCount < 0 || // 不足
                compareAndSetState(current, newCount))
                return newCount;
        }
    }

    boolean tryReleaseShared(int returnCount) {
        for (;;) {
            int current = getState();
            int newCount = current + returnCount;
            if (compareAndSetState(current, newCount))
                return true;
        }
    }
}
Semaphore的构造方法

Semaphore的构造方法接受一个permits参数,表示初始的许可证数量。

public Semaphore(int permits) {
    sync = new Sync(permits);
}
Semaphore的关键方法
  • acquire(int permits):线程尝试获取指定数量的许可证。如果没有足够的许可证,线程将被阻塞,直到有足够的许可证被释放。

  • release(int permits):线程释放指定数量的许可证,可能允许其他等待的线程继续执行。

AQS的作用

AQS提供了tryAcquireSharedtryReleaseShared两个方法,用于获取和释放共享资源。在Semaphore中,这两个方法被重写以实现许可证的获取和释放。

  • tryAcquireShared:尝试获取共享资源。如果当前状态大于或等于请求的许可证数量,减少状态值并返回当前状态,否则将线程放入等待队列。

  • tryReleaseShared:尝试释放共享资源。增加状态值,表示释放了许可证。

总结

Semaphore通过AQS的框架,利用状态值来控制许可证的获取和释放,从而实现了对线程访问共享资源的控制。当线程尝试获取许可证时,如果没有足够的许可证,线程会被放入等待队列,直到其他线程释放许可证。这种机制确保了线程的有序执行,避免了资源的竞争和冲突。

Exchange如何支持两个线程之间进行数据交换

在多线程编程中,Exchange通常指的是Java并发包中的Exchanger类,这是一个用于线程间数据交换的同步工具。Exchanger允许两个线程在一个交汇点上交换数据。当一个线程调用exchange方法时,它会等待另一个线程也在相同Exchanger实例上调用exchange方法。一旦两个线程都到达交汇点,它们就会交换数据并继续执行。

Exchanger的使用

下面是一个使用Exchanger在两个线程之间交换数据的简单示例:

import 
上一篇:通信技术在反无人机中的作用分析


下一篇:【Hive SQL 每日一题】在线峰值人数计算-测试数据