更多Spring与微服务相关的教程请戳这里 Spring与微服务教程合集
本文主要讲的是Hystrix底层原理的基础
- 首先Hystrix底层运用了大量的RxJava,当然对于RxJava我们没有必要全部学习一遍,了解原理即可。
- 而RxJava又是观察者模式的实现,所以观察者模式也可以了解一下。
- Hystrix可以使用基于信号量的隔离,可以了解一下信号量是干什么的
观察者模式的用途:解耦
观察者模式具体的逻辑是这样的:
- 观察者将自己注册到被观察者上
- 被观察者如果有变化,通知所有注册过的观察者
- 观察者收到通知,对自己进行更新
发布订阅模式:
观察者模式和发布订阅模式最大的区别就是发布订阅模式有个事件调度中心
观察者模式类图如下:
- Subject:是主体的抽象。主体(被观察者),内部持有对观察者集合的引用,attach方法表示注册一个观察者,detach方法表示注销一个观察者,notify方法表示通知所有注册了的观察者
- Observer:是观察者的抽象,update方法用于更新自己
- ConcreteSubject:主体的具体实现。内部维护了state变量,如果state有变化,则表示主体发生变化,则通知观察者
- ConcreteObserver:观察者的具体实现
2、RxJava
参考文章:https://www.jianshu.com/p/a406b94f3188
github源码链接:https://github.com/ReactiveX/RxJava/
2.1、概述
Rx是Reactive Extensions的缩写,定义为一个异步和基于事件的函数库。
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM(译文:RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)
- 关于版本:
Rxjava 2.0只是在Rxjava 1.0上增加了一些新特性,本质原理 & 使用基本相同
- 特点:
由于 RxJava的使用方式是:基于事件流的链式调用,所以使得 RxJava:
逻辑简洁
实现优雅
使用简单
更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅
2.2、原理
2.2.1、生活例子引入
顾客到饭店吃饭
示意图
流程图
2.2.2、RxJava原理介绍
Rxjava
原理基于 一种扩展的观察者模式Rxjava
的扩展观察者模式中有4个角色:
角色 | 作用 | 类比 |
---|---|---|
被观察者(Observable) | 产生事件 | 顾客 |
观察者(Observer) | 接收事件,并给出响应动作 | 厨房 |
订阅(Subscribe) | 连接 被观察者 & 观察者 | 服务员 |
事件(Event) | 被观察者 & 观察者 沟通的载体 | 菜式 |
- 具体原理
示意图
流程图
即RxJava
原理可总结为:被观察者 (Observable)
通过 订阅(Subscribe)
按顺序发送事件 给观察者 (Observer)
, 观察者(Observer)
按顺序接收事件 & 作出对应的响应动作。具体如下图:
2.3、基本使用
Rxjava
的使用方式有两种:
- 分步骤实现:该方法主要为了深入说明
Rxjava
的原理 & 使用,主要用于演示说明 - 基于事件流的链式调用:主要用于实际使用
分步骤实现
3、信号量类Semaphore
Semaphore是juc包中一个用于线程同步的工具类
信号量-控制同时访问的线程个数
通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可
Semaphore类中比较重要的几个方法:
- public void acquire(): 用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可
- public void acquire(int permits):获取permits个许可
- public void release() { } :释放许可。注意,在释放许可之前,必须先获获得许可
- public void release(int permits) { }:释放permits个许可
上面4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:
- public boolean tryAcquire():尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
- public boolean tryAcquire(long timeout, TimeUnit unit):尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
- public boolean tryAcquire(int permits):尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false 4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit): 尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
- 还可以通过availablePermits()方法得到可用的许可数目
例子:
若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现
import java.util.concurrent.Semaphore; public class SemaphoreTest { /** * 定义机器数量 */ public static final int MACHINE_AMOUNT=5; /** * 定义工人数量 */ public static final int WORKER_AMOUNT=8; private Semaphore semaphore = new Semaphore(MACHINE_AMOUNT); /** * 工人工作的方法 */ public void work(int number) throws InterruptedException { //获取信号量 semaphore.acquire(); System.out.println("工人"+number+"--开始work--"); //模拟工人工作了3秒钟 Thread.sleep(3000); //释放信号量 semaphore.release(); System.out.println("工人"+number+"--结束work--"); } public static void main(String[] args) { SemaphoreTest st = new SemaphoreTest(); for (int i = 0; i < WORKER_AMOUNT; i++) { final int number=i; new Thread(()->{ try { st.work(number); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } }