## 前言
大家好,今天给大家分享 Dubbo 中的负载均衡。在前一个章节中我们介绍 Dubbo延迟服务暴露,我们例举了常见的使用场景并且进行了源码解析来分析其实现原理,同时我们也知道 Dubbo 延迟服务暴露其核心就是通过一个 延迟的调度器指定延迟时间后开始服务的暴露。很多小伙伴可能会好奇:我们的服务部署基本都是集群形式,那服务消费端到底是调用哪一个服务提供者呢?都有哪些常见的服务选择算法?为了揭开这些困惑,下面就让我们快速开始吧!
## 1. 负载均衡简介
那么什么是负载均衡呢?举个简单例子:在火车站购买火车票场景,我们知道节假日肯定有非常多的人购买火车票,那么一个售票窗口卖票的话肯定会排很长的队对用户来说体验非常差。那我们能不能多增加几个售票窗口呢?是不是并行处理的能力马上得到提高了呢?没错!我们在软件工程里面也是如此。比如 Nginx 常常用作软件负载均衡、F5用作硬件的负载均衡器等等。负载均衡算法可以有效提高系统的并发处理能力、容错能力、一致性访问等。如下图所示:
![负载均衡](http://youngitman.tech/wp-content/uploads/2021/05/负载均衡.png)
1. 并发处理能力:通过横向拓展部署后端服务器数量做到服务扩容。
2. 容错能力:通过 Nginx 代理请求后端服务失败时转移调用其他存活服务。
3. 一致性访问:在某些场景中指定路由到固定服务器,比如:特定服务器对登录用户 Session 进行缓存。
## 2. Dubbo 支持负载均衡算法
### 2.1 加权随机算法
随机算法比较简单就和数学中的随机值选取一样,比如:1-100中随机选择一个数字。在 Dubbo 中对随机算法增加了一个权重概念。举个例子:
| 服务列表 | 权重 |
| --------- | ---- |
| Service01 | 10 |
| Service02 | 20 |
| Service03 | 20 |
| Service04 | 50 |
从上表中我们知道4个服务权重分别是10、20、20、50,参考下图描述了随机选择步骤:
![随机算法](http://youngitman.tech/wp-content/uploads/2021/05/随机算法.png)
1. 首先我们根据权重总和利用随机值获取初始 `offset`,假设 `offset` 值为 60。
2. 第一步循环服务列表第一个服务获取权重值 10 并使用 `offset` - `weight` 结果值为 50 > 0 继续执行重新赋值`offset` = 50。
3. 第二步循环服务列表第二个服务获取权重值 20 并使用 `offset` - `weight` 结果值为 30 > 0 继续执行。
4. 第三步循环服务列表第三个服务获取权重值 20 并使用 `offset `- `weight` 结果值为 10 > 0 继续执行。
5. 第四步循环服务列表第四个服务获取权重值 50 并使用 `offset` - `weight` 结果值为 -40 > 0 找到服务终止。
> **Tips:**此算法数据量越大数据分配越均匀。
#### 2.1.1 算法实现:
```java
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 服务总数
int length = invokers.size();
// 标识这些服务是否具有相同的权重
boolean sameWeight = true;
// 所有服务权重集合
int[] weights = new int[length];
// 第一个服务权重
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// 权重总和
int totalWeight = firstWeight;
for (int i = 1; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// 记录权重
weights[i] = weight;
// 合计权重值
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// 从(0,totalWeight]范围选择权重值
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// 基于随机值返回服务
for (int i = 0; i < length; i++) {
// offset = offset - weights[i]
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果权重值相同或者totalWeight=0返回服务列表中随机服务
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
}
```
#### 2.1.2 算法过程分析:
1. 循环 `invokers` 服务列表计算出权重总和并标记出是否存在权重全部相同情况。
2. 根据上面步骤的标记判断 `invokers` 服务列表所有服务权重是否相同,相同则随机返回服务列表中的一个服务。
3. `invokers` 服务列表存在服务权重不相同时,产生一个权重总和范围内的一个大于0的随着整数赋值给 `offset`,然后循环访问服务列表中的元素并且使用 `offset` 减去当前循环元素的权重值,如果差值大于0则赋值给当前的 `offset` 继续执行元素循环,当 `offset` 减去当前元素的权重大于零时停止循环,则当前查找的元素为选择的服务。
### 2.2 加权轮询算法
轮询负载均衡算法顾名思义就是轮流调用服务。举个例子:假设4个请求通过 Nginx 代理转发到后端服务:
![轮询算法](http://youngitman.tech/wp-content/uploads/2021/05/轮询算法.png)
同时 Dubbo 中的轮询算法也是需要权重支持。轮询负载均衡算法可以让 RPC 调用严格按照我们设置的比例来分配。不管是少量的调用还是大量的调用。但是轮询负载均衡算法也有不足的地方,存在慢的服务累积请求的问题,比如:第二台 `Service2` 很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上,导致整个系统变慢。以下是加权轮询算法图:
![加权轮询算法](http://youngitman.tech/wp-content/uploads/2021/05/加权轮询算法.png)
#### 2.2.1 算法实现
```java
public class RoundRobinLoadBalance extends AbstractLoadBalance {
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//服务唯一标识,获取这一组服务第一个服务:因为这是一组服务所有key是相同的
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();//DemoService.method1
//通过服务唯一标识对这一组服务进行缓存 key -> Map
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();//服务实例唯一表示:[test://127.0.0.1:1/DemoService,test://127.0.0.1:2/DemoService,test://127.0.0.1:3/DemoService]
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
//获取服务权重
int weight = getWeight(invoker, invocation);
//WeightedRoundRobin封装实体
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
}
//权重发生变更
if (weight != weightedRoundRobin.getWeight()) {
//重新赋值
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
//记录更新时间
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {//大于当前最大权重则被选择
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
//计算所有权重值
totalWeight += weight;
}
//获取锁并且存在服务下线情况【invokers.size() != map.size()】
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
//回收周期
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
if (selectedInvoker != null) {
//将其选择服务权重值减去总权重值
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
}
```
#### 2.2.2 算法过程分析
1. 首先初始化 `maxCurrent` 为最小值 `Long.MIN_VALUE` 、`totalWeight` 为0。
2. 循环 `invokers` 服务列表,每个元素中的 `current` (初始化为0)加上元素权重值大于 `maxCurrent`,
则赋值给 `maxCurrent` 并且赋值选中服务( `selectedInvoker` )为当前元素,并且累计当前元素权重值赋值给 `totalWeight` 。
3. 设置当前选中服务权重值为选中元素权重减去当前权重总和。
> **Tips:**上述算法存在一定的难度,稍作了解即可。
### 2.3 最少活跃调用数算法
我们在实际服务调度场景中一般存在多服务实例部署情况,当相同的应用部署到不同机器很可能存在不同服务处理的能力不同有快有慢,那么我们理想化的认为在服务调用过程中处理能力慢的服务器接收较少的服务请求更为符合我们的要求。如下示例图:
![最少活跃计数算法](http://youngitman.tech/wp-content/uploads/2021/05/最少活跃计数算法.png)
从图中可以看出假设我们这里有5个请求通过 Ngingx 代理转发到后端服务,这里 `Service02` 只接收到一个请求(处理能力较弱)。
#### 2.3.1 算法实现
```java
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
//初始化最小活跃值,最小活跃数,最小活跃的invoker索引(index),权重数组
int leastActive = -1;
int leastCount = 0;
int[] leastIndexes = new int[length];
int[] weights = new int[length];
int totalWeight = 0;
int firstWeight = 0;
boolean sameWeight = true;
// 过滤出所有最少活跃服务
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 获取这个invoker活跃数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 获取invoker的权重 默认100.
int afterWarmup = getWeight(invoker, invocation);
// 权重保存到集合中
weights[i] = afterWarmup;
//找到最小活跃数invoker
if (leastActive == -1 || active < leastActive) {
//重置最小活跃数值
leastActive = active;
//重置最小活跃计数
leastCount = 1;
// 记录最小活跃数对应索引
leastIndexes[0] = i;
// 重置权重合计
totalWeight = afterWarmup;
// 记录第一个最小活跃数对应权重值
firstWeight = afterWarmup;
// 重置是否具有相同权重标志
sameWeight = true;
//当前invoker活跃数等于最小活跃数
} else if (active == leastActive) {
// 最小活跃数leastCount增加并且记录当前invoker对应索引
leastIndexes[leastCount++] = i;
// 累计invoker权重值
totalWeight += afterWarmup;
// 判断是否权重值相同
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
//最小活跃数invoker只有一个
if (leastCount == 1) {
// 直接返回
return invokers.get(leastIndexes[0]);
}
//具有多个相同活跃数、具有不同权重且权重和大于0
if (!sameWeight && totalWeight > 0) {
// 根据权重值进行一个(0,totalWeight]整数随机值
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// 基于随机值选择所在区间和随机负载均衡算法一致
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// 如果所有权重值相同或者totalWeight=0返回服务列表中随机一个服务
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
```
#### 2.3.2 算法过程分析
1. 初始化最小活跃值,最小活跃数,最小活跃的 `invoker` 索引(`index`),权重数组。
2. 循环 `invokers` 服务列表获取 `invoker` 活跃数、权重且过滤出所有最少活跃服务并记录最小活跃数对应索引,同时标记服务列表是否存在不同权重值。
3. 如果最小活跃数为一直接反正这个服务。
4. 如果存在多个相同活跃数、具有不同权重且权重和大于0,则根据权重值进行一个(0, `totalWeight` ]整数随机值
然后基于随机值选择所在区间和上面讲的随机负载均衡算法一致。
5. 如果所有权重值相同或者 `totalWeight=0` 返回服务列表中随机一个服务。
> **Tips:**上述算法存在一定的难度,稍作了解即可。
### 2.4 一致性 Hash 算法
要明白什么是一致性 Hash 首先得知道什么是 Hash 算法。我们在学习 HashMap 的 JDK 源码时可以了解到 HashMap 的数据结构由一个数组+链表构成如下所示:
![hash算法](http://youngitman.tech/wp-content/uploads/2021/05/hash算法.png)
上图简单画出了 HashMap 底层数据存储结构,从中可以看出当我们调用 `HashMap.put(K key, V value)` 方法时通过`hash(key)`计算出一个 Hash 值与数组长度求模得到一个索引即为数组下标对应位置,当这个位置存在元素时即发生 Hash 碰撞就会产生一个链。
一个常见的场景,比如我们在请求某个服务时需要固定访问某一台服务器如下所示:
![hash算法1](http://youngitman.tech/wp-content/uploads/2021/05/hash算法1.png)
从图中可以看出 `Client01` 通过 Nginx 固定访问 `Service01`、`Client02` 通过 Nginx 固定访问 `Service02`、`Client03` 通过 Nginx 固定访问 `Service03`。要满足这种场景就需要使用 Hash 算法。但是普通 Hash 算法存在一个问题是 Hash 出来的是实际后端服务节点,当其中某个服务宕机时 Hash 计算因子发生改变(**求模的分子**)如果计算 Hash 因子不调整则可能会路由到宕机那台服务就会存在问题。那一致性 Hash 就是解决这样的问题。
什么是一致性 Hash 呢?一致性哈希算法是采用的环形哈希空间的方式,它首先根据 Ip 或者其他信息为机器节点生成一个 Hash 值,它投射到一个[0,2^32-1]的圆环中。之后我们可以认为它是一个锚点。当请求进来后,它携带的数据,都统一生成一个 Hash 值,这时候比对之前的机器节点信息产生的 Hash 值,当遇到第一个大于或者等于该 Hash 值的缓存节点,我们将这个数据便归属于这个机器节点。
假设虚拟节点12个、真实服务3个分别是:`Invoker1`、`Invoker2`、`Invoker3`。下图展示了服务节点怎样进行虚拟节点映射的过程:
1. `Invoker1` 经过 Hash 计算后映射到环上对应位置
![Invoker1虚拟节点映射](http://youngitman.tech/wp-content/uploads/2021/05/Invoker1虚拟节点映射.png)
2. `Invoker2` 经过 Hash 计算后映射到环上对应位置
![Invoker2虚拟节点映射](http://youngitman.tech/wp-content/uploads/2021/05/Invoker2虚拟节点映射.png)
3. `Invoker3` 经过 Hash 计算后映射到环上对应位置
![Invoker3虚拟节点映射](http://youngitman.tech/wp-content/uploads/2021/05/Invoker3虚拟节点映射.png)
上图中长方形色块代表真实的服务,圆形代表虚拟节点。所有同一种颜色的圆形虚拟节点归属于同一色块的长方形色块所代表的真实服务。
#### 2.4.1 算法实现
```java
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
/**
* Hash nodes name
*/
public static final String HASH_NODES = "hash.nodes";
/**
* Hash arguments name
*/
public static final String HASH_ARGUMENTS = "hash.arguments";
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//获取调用方法名称
String methodName = RpcUtils.getMethodName(invocation);
//生成唯一标识
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
int identityHashCode = System.identityHashCode(invokers);
//对ConsistentHashSelector进行缓存
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
//缓存中selector == null 或 当invokers中服务发生变化 服务下限/服务上线 invokers hash值发生变更需要重新构建ConsistentHashSelector
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
//执行服务选择
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
//虚拟Invoker(虚拟节点)
this.virtualInvokers = new TreeMap<>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
//虚拟节点数
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));//参与hash的参数索引
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();//127.0.0.1:1
// 将虚拟节点细化为 (replicaNumber / 4) 份
for (int i = 0; i < replicaNumber / 4; i++) {
//地址+序号进行摘要
byte[] digest = md5(address + i);
// 每份4个点散列到treeMap中
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
String key = toKey(invocation.getArguments());
byte[] digest = md5(key);
return selectForKey(hash(digest, 0));
}
private Invoker<T> selectForKey(long hash) {
//获取至少大于或等于给定hash值
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
//获取第一个Entry
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
}
}
```
#### 2.4.2 算法过程分析
1. 虚拟节点构建:
* 首先初始化虚拟节点数,默认160可以通过外部参数指定。
* 获取参与 Hash 计算的参数值,`hash.arguments` 属性值,缺省是0参数位置。
* 循环 `invokers` 服务列表根据 **ip+port+序号**生成 `replicaNumber` 个的节点(生成虚拟节点),其中 `key` 就是算出来的 Hash 值,`value` 就是 `invoker` 且 `key` 的值从小到大排序的。
2. 节点查找
* 根据 `hash.arguments`参数取出对应位的参数,拼接成 `key`。
* 使用 `md5` 对 `key` 计算,使用 Hash 算法算出 `key` 对应的 Hash 值。
* 根据这个 `key` 的 Hash 值找出对应的 `invoker` ,这里返回键值大于或等于 `key` 的那部分 ,然后再取第一个。
> **Tips:**上述算法存在一定的难度,稍作了解即可。
## 3. 小结
在本小节中我们主要学习了 Dubbo 中常见的负载均衡算法以及使用场景。
本节课程的重点如下:
1. 理解 什么是负载均衡算法
2. 知道负载均衡算法使用场景
3. 了解 Dubbo 中有哪些负载均衡算法
4. 了解 Dubbo 负载均衡算法实现逻辑
#### 作者
> 个人从事金融行业,就职过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就职于某银行负责统一支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。同时也热衷于技术分享创立公众号和博客站点对知识体系进行分享。关注公众号:**青年IT男** 获取最新技术文章推送!
**博客地址:** [http://youngitman.tech](http://youngitman.tech "http://youngitman.tech")
**微信公众号:**
![](https://oscimg.oschina.net/oscnet/up-1d4794a9d08c099b74d4bab3b1faf4eb5dd.JPEG)