为什么使用中间件?如何实现Redis分布式锁以及缓存和消息队列常见问题

中间件

什么是中间件

用户量增加时对应用程序做横向扩展的架构趋势,比如:MySQL读写分离或对MySQL表进行横向和纵向拆分。即将应用程序的单个节点拆分成多个节点,用户登录到不同节点后产生的连接session,通过采用中间一个共享的介质去存储多节点的会话。

企业级应用中常用的中间件主要是缓存消息队列这两类,原因有以下几点:

  1. 随着业务规模增大,单体应用无法满足业务场景需求。
  2. 用户数量剧增,单凭数据库无法抗住并发压力。
  3. 业务场景复杂需要解耦。
  4. 业务场景丰富造成访问热点。

 

从数据请求看缓存

数据请求一般分为三层,第一层在应用程序中,比如Django自身的缓存(本地内存),如果在本地内存中找不到请求的内容则继续向后端发起请求,一般后端放置Redis、Memcached等应用,在应用中以分布式的方式来存储。如果仍未找到再访问数据库。

综上,缓存分为两类:本地缓存分布式缓存

本地缓存调用方式是在本地,直接访问内存即可拿到缓存结果,效率高。本地缓存一般存放静态数据,如配置文件

分布式缓存一般需要通过网络连接,需要远程调用,产生网络开销(数据封包网络请求再去解包),根据不同的算法分不到不同服务器上,支持存储更多缓存的数据。分布式缓存一般存放全局数据,可供多个业务模块间共享。

 

从应用程序(后台服务)看缓存

下图为缓存与数据库间同步方式的样例图:

为什么使用中间件?如何实现Redis分布式锁以及缓存和消息队列常见问题

1、Cache Aside方式:适合轻量级的应用

读:1.判断缓存中有无数据,有则返回给用户,没有则到数据库中读取,

       2.读取到后往缓存中写一份,交给用户一份。

更新:1.更新数据库,

           2.清除缓存或对之前的缓存做更新,返回已更新的用户数据。

优点:实现简单,逻辑清晰。

缺点:更新时对业务代码有侵入;缓存策略需要维护。

 

2、Read Write Trough方式:

在后台服务和缓存间加一层中间的代理网关,即应用程序将缓存和数据库视为一个整体,提供标准的SQL接口或将其封装成ORM。该方式适用于频繁使用缓存,产生大量更新的情况。

优点:可直接读网关这一层,不需要维护缓存策略。

缺点:需要额外代码维护更新逻辑。

 

3、Write Back方式:

不考虑数据库,只认为有缓存,缓存到数据库使用消息队列自动同步到数据库上。

优点:对用户来讲,访问和返回效率是及时的,因为不用考虑写持久化存储。

缺点:由于缓存到数据库为异步写,会有数据丢失的风险。

该方式适用于对数据一致性要求不高的业务场景。

 

从数据库角度看缓存

1、双写方式:

应用程序向数据库和缓存中各写一份。

为了避免数据不一致,多线程使用先写数据库做持久化存储;单线程引入分布式事务管理方式,即先写数据库,如果写入失败则回退或通知用户重试,如果写完数据发现Redis Crash了,需从数据库定时像缓存中做同步或者给每个请求增加监控,发现失败后及时从数据库更新到缓存。

缺点:对业务逻辑代码有侵入

 

2、消息队列方式:

先写消息队列,再写缓存。

当用户并发请求比较大时,如果直接写MySQL会导致扛不住,后台服务去写消息队列,向MySQL并行地进行同步,消息队列会对请求进行队列的排序或进行限速。

优点:减少数据不一致的情况。

缺点:写消息队列效率较”双写“低。(消息队列也支持持久化,即写了消息队列就可认为写入成功,并返回给用户写入成功,同时再更新缓存)

 

3、MySQL的binlog方式:

直接写入数据库。

通过binlog回放写入到缓存中,由于binlog是线性处理,一旦阻塞会影响同步性能。

 

小结:

数据不一致问题,以上三种方式都会出现。以数据库为准,再写入缓存。

数据的延迟问题:双写延迟最低——>消息队列——>binlog延迟最高

系统的耦合度:双写耦合度最高——>消息队列——>binlog耦合度最低

复杂度:双写复杂度最低——>消息队列——>binlog(会出现网络波动、阻塞等,需回放全量数据)

 

缓存有可能出现的问题

常见问题包括三种:缓存穿透、缓存并发、缓存雪崩。问题的主要原因是:大并发请求打到数据库上。

 

缓存穿透:(值不存在,人为攻击居多)

利用未存储到缓存中的数据,大量请求数据库,导致数据库异常或者业务产生拒绝服务。

解决方法:1、应用程序和数据库约定,如果是空值或无法返回的数据也进行缓存,设置为NULL返回给Redis,通知用户请求失败。或者设置较短的超时时长。

                  2、使用布隆过滤器(会误杀)

 

缓存并发(击穿):值存在,且为正常请求

请求中某一个key在某时间点过期,恰好此时间点有大量并发请求到这个key,由于已过期,大量请求打到数据库。

解决方案:1、互斥锁:当缓存一旦失效,Redis中设置SETNX互斥锁,再去查询数据库并写回缓存。获取锁的线程从数据库拉数据更新缓存,其他线程等待。

                  2、异步后台更新:后台任务针对过期的key自动刷新。

 

缓存雪崩:Redis设置不合理导致同一时间大量key过期,大量请求打到数据库。

解决方案:1、多级缓存:不同级别key设置不同超时时间。

                  2、随机超时:key的超时时间随机设置,防止同时过期。

                  3、架构层提升系统可用性,完善监控和报警。

 

Redis如何实现分布式锁

使用setnx实现加锁,可以同时通过expire添加超时时间。

锁的值可使用一个随机的uuid或特定的命名。

释放锁的时候,通过uuid判断是否是该锁,是则执行delete释放。

"""
分布式锁的特点:
互斥性。在任意时刻,只有一个客户端能持有锁
锁超时。即使一个客户端持有锁的期间崩溃而没有主动释放锁,也需要保证后续其他客户端能够加锁成功
加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给释放了。
"""
# 版本一:
# 加锁过程:
# 首先需要为锁生成一个唯一的标识,这里使用 uuid;
# 然后使用 setnx 设置锁,如果该锁名之前不存在其他客户端的锁则加锁成功,接着设置锁的过期时间防止发生死锁并返回锁的唯一标示;
# 如果设置失败先判断一下锁名所在的锁是否有过期时间,因为 setnx 和 expire 两个命令执行不是原子性的,可能会出现加锁成功但是设置超时时间失败出现死锁。如果不存在就给锁重新设置过期时间,存在就不断循环知道加锁时间超时加锁失败。
# 解锁过程:
# 首先整个解锁操作需要在一个 Redis 的事务中进行;
# 使用 watch 监听锁,防止解锁时出现删除其他人的锁;
# 查询锁名所在的标识是否与本次解锁的标识相同;
# 如果相同则在事务中删除这个锁,如果删除过程中锁自动失效过期又被其他客户端拿到,因为设置了 watch 就会删除失败,这样就不会出现删除了其他客户端锁的情况。

import uuid
import math
import time

from redis import WatchError


def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3, lock_timeout=2):
    """
    基于Redis实现的分布式锁

    :param conn: 连接
    :param lock_name: 锁的名称
    :param acquire_timeout: 获取锁的超时时间,默认3秒
    :param lock_timeout:  锁的超时时间,默认2秒
    :return:
    """
    identifier = str(uuid.uuid4())  # 首先需要为锁生成一个唯一的标识,这里使用uuid
    lockname = f'lock:{lock_name}'
    lock_timeout = int(math.ceil(lock_timeout))
    end = time.time() + acquire_timeout

    while time.time() < end:
        # 如果不存在这个锁则加锁并设置过期时间,避免死锁
        if conn.setnx(lockname, identifier):  # 使用setnx设置锁,如果该锁名之前不存在其他客户端的锁则加锁成功
            conn.expire(lockname, lock_timeout)
            return identifier  # 返回锁的唯一标识
        # 如果存在锁,且这个锁没有过期时间则为其设置过期时间,避免死锁
        elif conn.ttl(lockname) == -1:  # setnx和expire两个命令执行不是原子性的,可能会出现加锁成功但是设置超时时间失败出现死锁。
            # 如果不存在就给锁重新设置过期时间,存在就不断循环直到加锁时间超时加锁失败。
            conn.expire(lockname, lock_timeout)

        time.sleep(0.001)
    return False


def release_lock(conn, lockname, identifier):
    """
    释放锁

    :param conn: Redis连接
    :param lockname: 锁的名称
    :param identifier: 锁的标识
    :return:
    """
    # python 中的redis事务是通过pipeline的封装实现的
    with conn.pipeline() as pipe:  # 整个解锁操作需要在一个Redis事务中进行
        lockname = 'lock:' + lockname
        while True:
            try:
                # watch锁,multi后该key被其他客户端改变,事务操作会抛出WatchError异常
                pipe.watch(lockname)
                iden = pipe.get(lockname)
                if iden and iden.decode('utf-8') == identifier:  # 查询锁名所在的标识是否与本次解锁的标识相同
                    # 事务开始
                    pipe.multi()
                    pipe.delete(lockname)  # 锁标识相同则在事务中删除这个锁
                    pipe.excute()
                    return True
                pipe.unwatch()
                break
            except WatchError:
                pass
            return False
# 版本二:
# 如果你使用的 Redis 版本大于等于 2.6.12 版本,加锁的过程就可以进行简化。因为这个版本以后的 Redis set 操作支持 EX 和 NX 参数,是一个原子性的操作。
#
# EX seconds : 将键的过期时间设置为 seconds 秒。 执行 SET key value EX seconds 的效果等同于执行 SETEX key seconds value 。
# NX : 只在键不存在时, 才对键进行设置操作。 执行 SET key value NX 的效果等同于执行 SETNX key value 。


def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3, lock_timeout=2):
    identifier = str(uuid.uuid4())
    lockname = f'lock:{lock_name}'
    lock_timeout = int(math.ceil(lock_timeout))
    end = time.time() + acquire_timeout

    while time.time() < end:
        if conn.set(lockname, identifier, ex=lock_timeout, nx=True):
            return identifier
        time.sleep(0.001)
    return False


def release_lock(conn, lockname, identifier):
    with conn.pipeline() as pipe:
        lockname = 'lock:' + lockname
        while True:
            try:
                pipe.watch(lockname)
                iden = pipe.get(lockname)
                if iden and iden.decode('utf-8') == identifier:
                    # 事务开始
                    pipe.multi()
                    pipe.delete(lockname)
                    pipe.excute()
                    return True
                pipe.unwatch()
                break
            except WatchError:
                pass
        return False


# 版本三:
def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3,lock_timeout=2):
    identifier = str(uuid.uuid4())
    lockname = f'lock:{lock_name}'
    lock_timeout = int(math.ceil(lock_timeout))
    end = time.time() + acquire_timeout
    while time.time() < end:
        if conn.set(lockname, identifier, ex=lock_timeout, nx=True):
            return identifier
        time.sleep(0.001)
    return False


def release_lock(conn, lock_name, identifier):
    unlock_script = """
    if redis.call("get", KEYS[1])==ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """
    lockname = f'lock:{lock_name}'
    unlock = conn.register_script(unlock_script)
    result = unlock(keys=[lockname], args=[identifier])
    if result:
        return True
    else:
        return False

 

消息队列有什么用途

  1. 异步处理:一个请求进来,可以分发为不同的请求,由不同系统并发处理。
  2. 流量控制:令牌生成器在后台生成令牌队列,应用程序连接到服务器后,到令牌队列中消费令牌,直到令牌领完。<注:不是控制消息队列中的人数,而是控制发放令牌数>
  3. 服务解耦

 

生产级消息队列常见问题

1、如何保证事务在分布式消息队列的一致性?

采用分布式事务管理:二阶段提交2PC、TCC(Try Confirm Cancel)

在消息队列软件(Kafka、RabbitMQ…)基础上使用分布式消息事务提交特殊参数以解决。

如果对实时性要求不高,比如付款后加入等待几秒,在做后续的逻辑。

2、如何保证消息不会丢?

加入监控,如果发现消息序号连续性被破环进行报警。

对于RabbitMQ生产者发送了消息客户端需要确认,消息才会收到,再将队列中的消息清除。

3、如何处理重复消息?

可利用数据库的唯一约束(幂等性消灭重复)。

注:以上三个问题均不可能100%解决

 

 

 

 

 

 

 

 

上一篇:zookeeper分布式锁


下一篇:切换路由默认回到顶部功能