中间件
什么是中间件
用户量增加时对应用程序做横向扩展的架构趋势,比如:MySQL读写分离或对MySQL表进行横向和纵向拆分。即将应用程序的单个节点拆分成多个节点,用户登录到不同节点后产生的连接session,通过采用中间一个共享的介质去存储多节点的会话。
企业级应用中常用的中间件主要是缓存和消息队列这两类,原因有以下几点:
- 随着业务规模增大,单体应用无法满足业务场景需求。
- 用户数量剧增,单凭数据库无法抗住并发压力。
- 业务场景复杂需要解耦。
- 业务场景丰富造成访问热点。
从数据请求看缓存
数据请求一般分为三层,第一层在应用程序中,比如Django自身的缓存(本地内存),如果在本地内存中找不到请求的内容则继续向后端发起请求,一般后端放置Redis、Memcached等应用,在应用中以分布式的方式来存储。如果仍未找到再访问数据库。
综上,缓存分为两类:本地缓存和分布式缓存。
本地缓存调用方式是在本地,直接访问内存即可拿到缓存结果,效率高。本地缓存一般存放静态数据,如配置文件
分布式缓存一般需要通过网络连接,需要远程调用,产生网络开销(数据封包网络请求再去解包),根据不同的算法分不到不同服务器上,支持存储更多缓存的数据。分布式缓存一般存放全局数据,可供多个业务模块间共享。
从应用程序(后台服务)看缓存
下图为缓存与数据库间同步方式的样例图:
1、Cache Aside方式:适合轻量级的应用
读:1.判断缓存中有无数据,有则返回给用户,没有则到数据库中读取,
2.读取到后往缓存中写一份,交给用户一份。
更新:1.更新数据库,
2.清除缓存或对之前的缓存做更新,返回已更新的用户数据。
优点:实现简单,逻辑清晰。
缺点:更新时对业务代码有侵入;缓存策略需要维护。
2、Read Write Trough方式:
在后台服务和缓存间加一层中间的代理网关,即应用程序将缓存和数据库视为一个整体,提供标准的SQL接口或将其封装成ORM。该方式适用于频繁使用缓存,产生大量更新的情况。
优点:可直接读网关这一层,不需要维护缓存策略。
缺点:需要额外代码维护更新逻辑。
3、Write Back方式:
不考虑数据库,只认为有缓存,缓存到数据库使用消息队列自动同步到数据库上。
优点:对用户来讲,访问和返回效率是及时的,因为不用考虑写持久化存储。
缺点:由于缓存到数据库为异步写,会有数据丢失的风险。
该方式适用于对数据一致性要求不高的业务场景。
从数据库角度看缓存
1、双写方式:
应用程序向数据库和缓存中各写一份。
为了避免数据不一致,多线程使用先写数据库做持久化存储;单线程引入分布式事务管理方式,即先写数据库,如果写入失败则回退或通知用户重试,如果写完数据发现Redis Crash了,需从数据库定时像缓存中做同步或者给每个请求增加监控,发现失败后及时从数据库更新到缓存。
缺点:对业务逻辑代码有侵入
2、消息队列方式:
先写消息队列,再写缓存。
当用户并发请求比较大时,如果直接写MySQL会导致扛不住,后台服务去写消息队列,向MySQL并行地进行同步,消息队列会对请求进行队列的排序或进行限速。
优点:减少数据不一致的情况。
缺点:写消息队列效率较”双写“低。(消息队列也支持持久化,即写了消息队列就可认为写入成功,并返回给用户写入成功,同时再更新缓存)
3、MySQL的binlog方式:
直接写入数据库。
通过binlog回放写入到缓存中,由于binlog是线性处理,一旦阻塞会影响同步性能。
小结:
数据不一致问题,以上三种方式都会出现。以数据库为准,再写入缓存。
数据的延迟问题:双写延迟最低——>消息队列——>binlog延迟最高
系统的耦合度:双写耦合度最高——>消息队列——>binlog耦合度最低
复杂度:双写复杂度最低——>消息队列——>binlog(会出现网络波动、阻塞等,需回放全量数据)
缓存有可能出现的问题
常见问题包括三种:缓存穿透、缓存并发、缓存雪崩。问题的主要原因是:大并发请求打到数据库上。
缓存穿透:(值不存在,人为攻击居多)
利用未存储到缓存中的数据,大量请求数据库,导致数据库异常或者业务产生拒绝服务。
解决方法:1、应用程序和数据库约定,如果是空值或无法返回的数据也进行缓存,设置为NULL返回给Redis,通知用户请求失败。或者设置较短的超时时长。
2、使用布隆过滤器(会误杀)
缓存并发(击穿):值存在,且为正常请求
请求中某一个key在某时间点过期,恰好此时间点有大量并发请求到这个key,由于已过期,大量请求打到数据库。
解决方案:1、互斥锁:当缓存一旦失效,Redis中设置SETNX互斥锁,再去查询数据库并写回缓存。获取锁的线程从数据库拉数据更新缓存,其他线程等待。
2、异步后台更新:后台任务针对过期的key自动刷新。
缓存雪崩:Redis设置不合理导致同一时间大量key过期,大量请求打到数据库。
解决方案:1、多级缓存:不同级别key设置不同超时时间。
2、随机超时:key的超时时间随机设置,防止同时过期。
3、架构层提升系统可用性,完善监控和报警。
Redis如何实现分布式锁
使用setnx实现加锁,可以同时通过expire添加超时时间。
锁的值可使用一个随机的uuid或特定的命名。
释放锁的时候,通过uuid判断是否是该锁,是则执行delete释放。
"""
分布式锁的特点:
互斥性。在任意时刻,只有一个客户端能持有锁
锁超时。即使一个客户端持有锁的期间崩溃而没有主动释放锁,也需要保证后续其他客户端能够加锁成功
加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给释放了。
"""
# 版本一:
# 加锁过程:
# 首先需要为锁生成一个唯一的标识,这里使用 uuid;
# 然后使用 setnx 设置锁,如果该锁名之前不存在其他客户端的锁则加锁成功,接着设置锁的过期时间防止发生死锁并返回锁的唯一标示;
# 如果设置失败先判断一下锁名所在的锁是否有过期时间,因为 setnx 和 expire 两个命令执行不是原子性的,可能会出现加锁成功但是设置超时时间失败出现死锁。如果不存在就给锁重新设置过期时间,存在就不断循环知道加锁时间超时加锁失败。
# 解锁过程:
# 首先整个解锁操作需要在一个 Redis 的事务中进行;
# 使用 watch 监听锁,防止解锁时出现删除其他人的锁;
# 查询锁名所在的标识是否与本次解锁的标识相同;
# 如果相同则在事务中删除这个锁,如果删除过程中锁自动失效过期又被其他客户端拿到,因为设置了 watch 就会删除失败,这样就不会出现删除了其他客户端锁的情况。
import uuid
import math
import time
from redis import WatchError
def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3, lock_timeout=2):
"""
基于Redis实现的分布式锁
:param conn: 连接
:param lock_name: 锁的名称
:param acquire_timeout: 获取锁的超时时间,默认3秒
:param lock_timeout: 锁的超时时间,默认2秒
:return:
"""
identifier = str(uuid.uuid4()) # 首先需要为锁生成一个唯一的标识,这里使用uuid
lockname = f'lock:{lock_name}'
lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end:
# 如果不存在这个锁则加锁并设置过期时间,避免死锁
if conn.setnx(lockname, identifier): # 使用setnx设置锁,如果该锁名之前不存在其他客户端的锁则加锁成功
conn.expire(lockname, lock_timeout)
return identifier # 返回锁的唯一标识
# 如果存在锁,且这个锁没有过期时间则为其设置过期时间,避免死锁
elif conn.ttl(lockname) == -1: # setnx和expire两个命令执行不是原子性的,可能会出现加锁成功但是设置超时时间失败出现死锁。
# 如果不存在就给锁重新设置过期时间,存在就不断循环直到加锁时间超时加锁失败。
conn.expire(lockname, lock_timeout)
time.sleep(0.001)
return False
def release_lock(conn, lockname, identifier):
"""
释放锁
:param conn: Redis连接
:param lockname: 锁的名称
:param identifier: 锁的标识
:return:
"""
# python 中的redis事务是通过pipeline的封装实现的
with conn.pipeline() as pipe: # 整个解锁操作需要在一个Redis事务中进行
lockname = 'lock:' + lockname
while True:
try:
# watch锁,multi后该key被其他客户端改变,事务操作会抛出WatchError异常
pipe.watch(lockname)
iden = pipe.get(lockname)
if iden and iden.decode('utf-8') == identifier: # 查询锁名所在的标识是否与本次解锁的标识相同
# 事务开始
pipe.multi()
pipe.delete(lockname) # 锁标识相同则在事务中删除这个锁
pipe.excute()
return True
pipe.unwatch()
break
except WatchError:
pass
return False
# 版本二:
# 如果你使用的 Redis 版本大于等于 2.6.12 版本,加锁的过程就可以进行简化。因为这个版本以后的 Redis set 操作支持 EX 和 NX 参数,是一个原子性的操作。
#
# EX seconds : 将键的过期时间设置为 seconds 秒。 执行 SET key value EX seconds 的效果等同于执行 SETEX key seconds value 。
# NX : 只在键不存在时, 才对键进行设置操作。 执行 SET key value NX 的效果等同于执行 SETNX key value 。
def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3, lock_timeout=2):
identifier = str(uuid.uuid4())
lockname = f'lock:{lock_name}'
lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end:
if conn.set(lockname, identifier, ex=lock_timeout, nx=True):
return identifier
time.sleep(0.001)
return False
def release_lock(conn, lockname, identifier):
with conn.pipeline() as pipe:
lockname = 'lock:' + lockname
while True:
try:
pipe.watch(lockname)
iden = pipe.get(lockname)
if iden and iden.decode('utf-8') == identifier:
# 事务开始
pipe.multi()
pipe.delete(lockname)
pipe.excute()
return True
pipe.unwatch()
break
except WatchError:
pass
return False
# 版本三:
def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3,lock_timeout=2):
identifier = str(uuid.uuid4())
lockname = f'lock:{lock_name}'
lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end:
if conn.set(lockname, identifier, ex=lock_timeout, nx=True):
return identifier
time.sleep(0.001)
return False
def release_lock(conn, lock_name, identifier):
unlock_script = """
if redis.call("get", KEYS[1])==ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
lockname = f'lock:{lock_name}'
unlock = conn.register_script(unlock_script)
result = unlock(keys=[lockname], args=[identifier])
if result:
return True
else:
return False
消息队列有什么用途
- 异步处理:一个请求进来,可以分发为不同的请求,由不同系统并发处理。
- 流量控制:令牌生成器在后台生成令牌队列,应用程序连接到服务器后,到令牌队列中消费令牌,直到令牌领完。<注:不是控制消息队列中的人数,而是控制发放令牌数>
- 服务解耦
生产级消息队列常见问题
1、如何保证事务在分布式消息队列的一致性?
采用分布式事务管理:二阶段提交2PC、TCC(Try Confirm Cancel)
在消息队列软件(Kafka、RabbitMQ…)基础上使用分布式消息事务提交特殊参数以解决。
如果对实时性要求不高,比如付款后加入等待几秒,在做后续的逻辑。
2、如何保证消息不会丢?
加入监控,如果发现消息序号连续性被破环进行报警。
对于RabbitMQ生产者发送了消息客户端需要确认,消息才会收到,再将队列中的消息清除。
3、如何处理重复消息?
可利用数据库的唯一约束(幂等性消灭重复)。
注:以上三个问题均不可能100%解决