问题: redis-高并发场景下如何保证缓存数据与数据库的最终一致性

在高并发场景下,Redis 通常用作缓存层,与数据库结合使用以提高系统的性能。为了保证缓存数据与数据库的最终一致性,通常采用的有双写机制、缓存失效机制,基于双写机制、缓存失效机制又衍生出来了消息队列、事件驱动架构等

常见机制

常见的机制如下,个人理解无非是先后或各种手段操作数据库、redis,代码ai给写的示列只需看懂即可。

  1. 双写机制
    在更新数据库的同时,同步更新缓存。
    适用于写操作较少的场景
 public class CacheService {
       private final JdbcTemplate jdbcTemplate;
       private final RedisTemplate<String, Object> redisTemplate;

       public void updateData(String key, String value) {
           // 更新数据库
           jdbcTemplate.update("UPDATE table SET value = ? WHERE key = ?", value, key);

           // 更新缓存
           redisTemplate.opsForValue().set(key, value);
       }
  1. 缓存失效机制
    在更新数据库后,删除缓存中的旧数据,读取数据时候时写入缓存
    适用于写操作频繁的场景。
 public class CacheService {
       private final JdbcTemplate jdbcTemplate;
       private final RedisTemplate<String, Object> redisTemplate;

       public void updateData(String key, String value) {
           // 更新数据库
           jdbcTemplate.update("UPDATE table SET value = ? WHERE key = ?", value, key);

           // 删除缓存
           redisTemplate.delete(key);
       }

       public String getData(String key) {
           // 从缓存中获取数据
           String value = (String) redisTemplate.opsForValue().get(key);
           if (value == null) {
               // 缓存未命中,从数据库中获取数据
               value = jdbcTemplate.queryForObject("SELECT value FROM table WHERE key = ?", new Object[]{key}, String.class);
               if (value != null) {
                   // 将数据写入缓存
                   redisTemplate.opsForValue().set(key, value);
               }
           }
           return value;
       }
   }
  1. 消息队列机制
    使用消息队列异步更新redis,确保数据的一致性。
    适用于高并发写操作的场景。
  import com.rabbitmq.client.Channel;
   import com.rabbitmq.client.Connection;
   import com.rabbitmq.client.ConnectionFactory;

   public class CacheService {
       private final JdbcTemplate jdbcTemplate;
       private final RedisTemplate<String, Object> redisTemplate;

       public void updateData(String key, String value) {
           // 更新数据库
           jdbcTemplate.update("UPDATE table SET value = ? WHERE key = ?", value, key);

           // 发送消息到消息队列
           sendUpdateMessage(key, value);
       }

       private void sendUpdateMessage(String key, String value) {
           ConnectionFactory factory = new ConnectionFactory();
           factory.setHost("localhost");
           try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
               channel.queueDeclare("cache_update_queue", true, false, false, null);
               channel.basicPublish("", "cache_update_queue", null, (key + ":" + value).getBytes());
           } catch (Exception e) {
               e.printStackTrace();
           }
       }

       public void consumeUpdateMessages() {
           ConnectionFactory factory = new ConnectionFactory();
           factory.setHost("localhost");
           try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
               channel.queueDeclare("cache_update_queue", true, false, false, null);
               DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                   String message = new String(delivery.getBody(), "UTF-8");
                   String[] parts = message.split(":");
                   String key = parts[0];
                   String value = parts[1];

                   // 更新缓存
                   redisTemplate.opsForValue().set(key, value);
               };
               channel.basicConsume("cache_update_queue", true, deliverCallback, consumerTag -> {});
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
   }
  1. 事件驱动机制
    使用事件驱动架构,当数据库数据发生变化时,触发事件,事件处理器负责更新缓存。
    适用于复杂的数据更新逻辑。
   import org.springframework.context.ApplicationEventPublisher;
   import org.springframework.context.ApplicationEventPublisherAware;
   import org.springframework.stereotype.Service;

   @Service
   public class CacheService implements ApplicationEventPublisherAware {
       private final JdbcTemplate jdbcTemplate;
       private final RedisTemplate<String, Object> redisTemplate;
       private ApplicationEventPublisher eventPublisher;

       public void updateData(String key, String value) {
           // 更新数据库
           jdbcTemplate.update("UPDATE table SET value = ? WHERE key = ?", value, key);

           // 发布事件
           eventPublisher.publishEvent(new DataUpdatedEvent(this, key, value));
       }

       @Override
       public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
           this.eventPublisher = applicationEventPublisher;
       }

       @Service
       public class EventListener {
           private final RedisTemplate<String, Object> redisTemplate;

           @org.springframework.context.event.EventListener
           public void handleDataUpdatedEvent(DataUpdatedEvent event) {
               // 更新缓存
               redisTemplate.opsForValue().set(event.getKey(), event.getValue());
           }
       }
   }

   public class DataUpdatedEvent extends ApplicationEvent {
       private final String key;
       private final String value;

       public DataUpdatedEvent(Object source, String key, String value) {
           super(source);
           this.key = key;
           this.value = value;
       }

       public String getKey() {
           return key;
       }

       public String getValue() {
           return value;
       }
   }
  1. 定期补偿机制
    定期对缓存和数据库的数据进行校验,发现不一致时进行补偿操作。
    适用于对数据一致性要求较高的场景。
 import java.util.concurrent.Executors;
   import java.util.concurrent.ScheduledExecutorService;
   import java.util.concurrent.TimeUnit;

   public class DataConsistencyChecker {
       private final JdbcTemplate jdbcTemplate;
       private final RedisTemplate<String, Object> redisTemplate;
       private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

       public void startChecking() {
           scheduler.scheduleAtFixedRate(() -> {
               // 从数据库中获取所有数据
               List<Map<String, Object>> dataFromDB = jdbcTemplate.queryForList("SELECT key, value FROM table");

               for (Map<String, Object> row : dataFromDB) {
                   String key = (String) row.get("key");
                   String value = (String) row.get("value");

                   // 从缓存中获取数据
                   String cacheValue = (String) redisTemplate.opsForValue().get(key);

                   if (!value.equals(cacheValue)) {
                       // 数据不一致,更新缓存
                       redisTemplate.opsForValue().set(key, value);
                   }
               }
           }, 0, 1, TimeUnit.HOURS);
       }
   }

废弃缓存与更新缓存的取舍

由上面代码可看出 1和2 最大的区别在于更新数据库时到底是更新缓存还是删除缓存。
【废弃缓存】
优点:
操作简单,只需在更新数据库后删除缓存,下次读取时重新从数据库加载数据,减少了写的操作日数
缺点:
可能短暂不一致:在缓存删除后和新数据写入缓存前,可能会出现短暂的缓存不一致

【更新缓存】
优点:
数据强一致性:更新数据库和缓存同时进行,确保数据的一致性。
减少数据库读压力:缓存始终是最新的,减少了对数据库的读操作。
缺点:
复杂性增加:需要处理缓存更新失败的情况,可能需要回滚操作。
性能影响:每次更新操作都需要同时更新数据库和缓存,增加了操作的复杂性和时间

  • 写操作较少的场景:
    推荐使用更新缓存:因为写操作较少,更新缓存的额外开销相对较小,且可以确保数据的一致性。
  • 写操作频繁的场景:
    推荐使用废弃缓存:因为写操作频繁,更新缓存会增加系统的复杂性和开销,而废弃缓存可以减少缓存的写操作,降低系统负担。
  • 对数据一致性要求极高的场景:
    推荐使用更新缓存:尽管复杂性增加,但可以确保数据的强一致性。
  • 对性能要求较高且可以容忍短暂不一致的场景:
    推荐使用废弃缓存:可以减少数据库的读压力,提高系统的整体性能

淘汰缓存的顺序

https://blog.****.net/qq_39033181/article/details/119276120

【 方案一 】先淘汰缓存,再更新数据库
在并发量较大的情况下,会导致数据的不一致。
  1. A线程进行写操作,先成功淘汰缓存,但由于网络或其它原因,还未更新数据库
  2. B线程进行读操作,发现缓存中没有想要的数据,从数据库中读取到的是旧数据,并把旧数据放入缓存。此时数据库与缓存都是旧值,数据没有不一致
  3. A线程将数据库更新完成,数据库中是更新后的新数据,缓存中是更新前的旧数据,造成数据不一致。

【 方案二 】先更新数据库,再淘汰缓存
在并发量较大的情况下,会导致数据的短暂不一致,但是数据会最终一致。
  1. A线程进行写操作,更新数据库,还未淘汰缓存
  2. B线程从缓存中可以读取到旧数据,此时数据不一致
  3. A线程完成淘汰缓存操作,其它线程进行读操作,从数据库中读入最新数据,此时数据一致

延时双删

上述方案二更简单,在高并发场景下也能保证数据的最终一致性,但是如果我就想用方案一呢?

什么是延时双删

先删再更新数据库 过N秒后再删一次缓存,怎么实现放后面spring-cache集成里,大概有 1.延时队列、2.线程池实现延时任务。

小结

  • 这些都是理论,真正写代码,有cache框架,哪有这么烦,很多人喜欢问,那我们就得理,理了总比不理好,写这个就是怕我自己忘,呵
  • 无论怎么样在高并发场景下,我们也只能要求缓存数据与数据库的最终一致性,如果要求强一致性还要缓存干嘛呢?操作直接走DB更香
  • 大多数情况下建议使用淘汰缓存机制,然后先更新数据库,再淘汰缓存,满足大多数的场景了
上一篇:MySQL高级SQL技巧:提升数据库性能与效率