扩展Redis的Jedis客户端,哨兵模式读请求走Slave集群

扩展Redis的Jedis客户端,哨兵模式读请求走Slave集群

2018年12月06日 14:26:45 温故而知新666 阅读数 897
 
版权声明:本文为博主原创文章,遵循CC 4.0 by-sa版权协议,转载请附上原文出处链接和本声明。

Redis哨兵模式,由Sentinel节点和Redis节点组成,哨兵节点负责监控Redis的健康状况,负责协调Redis主从复制的关系。

本文不详细讨论Redis哨兵模式,关于哨兵的详细介绍可以参考(https://blog.csdn.net/u010297957/article/details/55050098

在使用哨兵模式以后,客户端不能直接连接到Redis集群,而是连接到哨兵集群,通过哨兵节点获取Redis主节点(Master)的信息,再进行连接,下面给出一小段代码。

  1.  
    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
  2.  
    jedisPoolConfig.setMaxTotal(10);
  3.  
    jedisPoolConfig.setMaxIdle(5);
  4.  
    jedisPoolConfig.setMinIdle(5);
  5.  
     
  6.  
    Set<String> sentinels = new HashSet<>(Arrays.asList(
  7.  
    "192.168.80.112:26379",
  8.  
    "192.168.80.113:26379",
  9.  
    "192.168.80.114:26379"
  10.  
    ));
  11.  
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
  12.  
    poolConfig.setMaxTotal(10);
  13.  
    poolConfig.setMaxIdle(5);
  14.  
    poolConfig.setMinIdle(5);
  15.  
    JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig);

可以看到,客户端只配置了哨兵集群的IP地址,通过哨兵获取redis主节点信息,再与其进行连接,下面给出关键代码的源码分析,下面代码片段讲述了如何获取主节点信息。

  1.  
    private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
  2.  
    //主节点ip与port对象
  3.  
    HostAndPort master = null;
  4.  
    //是否可以连接到哨兵节点
  5.  
    boolean sentinelAvailable = false;
  6.  
     
  7.  
    log.info("Trying to find master from available Sentinels...");
  8.  
     
  9.  
    //循环所有的哨兵节点,依次进行连接,逻辑如下:
  10.  
    //先连接第一个,如果连接成功,能够获取主节点信息,方法返回,否则连接第二个,第三个,第N个。
  11.  
    //如果全部都失败,则抛出异常,RedisPool初始化失败
  12.  
    for (String sentinel : sentinels) {
  13.  
     
  14.  
    //哨兵的ip和port
  15.  
    final HostAndPort hap = HostAndPort.parseString(sentinel);
  16.  
     
  17.  
    log.info("Connecting to Sentinel " + hap);
  18.  
     
  19.  
    Jedis jedis = null;
  20.  
    try {
  21.  
    //与哨兵节点进行连接,这里可能会出错,比如哨兵挂了。。
  22.  
    jedis = new Jedis(hap.getHost(), hap.getPort());
  23.  
     
  24.  
    //查询主节点信息
  25.  
    List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
  26.  
     
  27.  
    //设置可以连接到哨兵
  28.  
    sentinelAvailable = true;
  29.  
     
  30.  
    //如果主节点信息获取不到,则继续循环,换一下哨兵继续上面逻辑
  31.  
    if (masterAddr == null || masterAddr.size() != 2) {
  32.  
    log.warn("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
  33.  
    + ".");
  34.  
    continue;
  35.  
    }
  36.  
     
  37.  
    //获取到主节点信息
  38.  
    master = toHostAndPort(masterAddr);
  39.  
    log.info("Found Redis master at " + master);
  40.  
    break;
  41.  
    } catch (JedisException e) {
  42.  
    // resolves #1036, it should handle JedisException there's another chance
  43.  
    // of raising JedisDataException
  44.  
    //出异常直接忽略,继续循环
  45.  
    log.warn("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
  46.  
    + ". Trying next one.");
  47.  
    } finally {
  48.  
    if (jedis != null) {
  49.  
    jedis.close();
  50.  
    }
  51.  
    }
  52.  
    }
  53.  
     
  54.  
    //如果全部哨兵都获取不到主节点信息则抛出异常
  55.  
    if (master == null) {
  56.  
    //可以连接到哨兵,但是查询不到主节点信息
  57.  
    if (sentinelAvailable) {
  58.  
    // can connect to sentinel, but master name seems to not
  59.  
    // monitored
  60.  
    throw new JedisException("Can connect to sentinel, but " + masterName
  61.  
    + " seems to be not monitored...");
  62.  
    } else {
  63.  
    //连接不到哨兵 有可能哨兵全部挂了
  64.  
    throw new JedisConnectionException("All sentinels down, cannot determine where is "
  65.  
    + masterName + " master is running...");
  66.  
    }
  67.  
    }
  68.  
     
  69.  
    log.info("Redis master running at " + master + ", starting Sentinel listeners...");
  70.  
     
  71.  
    //下面的逻辑是上面可以拿到主节点信息时才会执行
  72.  
    //循环哨兵,建立订阅消息,监听节点切换消息,如果redis集群节点发生变动,这里会收到通知
  73.  
    for (String sentinel : sentinels) {
  74.  
    final HostAndPort hap = HostAndPort.parseString(sentinel);
  75.  
    JedisSentinelSlavePool.MasterListener masterListener = new JedisSentinelSlavePool.MasterListener(masterName, hap.getHost(), hap.getPort());
  76.  
    // whether MasterListener threads are alive or not, process can be stopped
  77.  
    masterListener.setDaemon(true);
  78.  
    masterListeners.add(masterListener);
  79.  
    masterListener.start();
  80.  
    }
  81.  
     
  82.  
    //返回主节点信息
  83.  
    return master;
  84.  
    }

下面的代码判断,分析了客户端如何感知到redis主从节点关系发生变化,原理是通过订阅哨兵的频道获取的,当又新的主节点出现,则清空原有连接池,根据新的主节点重新创建连接对象。

  1.  
     
  2.  
     
  3.  
    running.set(true);
  4.  
     
  5.  
    //死循环
  6.  
    while (running.get()) {
  7.  
     
  8.  
    //与哨兵进行连接
  9.  
    j = new Jedis(host, port);
  10.  
     
  11.  
    try {
  12.  
    // double check that it is not being shutdown
  13.  
    if (!running.get()) {
  14.  
    break;
  15.  
    }
  16.  
     
  17.  
    //订阅频道监听节点切换消息
  18.  
    j.subscribe(new JedisPubSub() {
  19.  
    @Override
  20.  
    public void onMessage(String channel, String message) {
  21.  
    log.info("Sentinel " + host + ":" + port + " published: " + message + ".");
  22.  
     
  23.  
    String[] switchMasterMsg = message.split(" ");
  24.  
     
  25.  
    if (switchMasterMsg.length > 3) {
  26.  
     
  27.  
    if (masterName.equals(switchMasterMsg[0])) {
  28.  
    //toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))
  29.  
    //这里获取了新的主节点的ip与端口
  30.  
    //调用initPool清空原来的连接池,这样一来,当需要获取jedis时,池会根据新的主接线创建对象
  31.  
    initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
  32.  
    } else {
  33.  
    log.info("Ignoring message on +switch-master for master name "
  34.  
    + switchMasterMsg[0] + ", our master name is " + masterName);
  35.  
    }
  36.  
     
  37.  
    } else {
  38.  
    log.warn("Invalid message received on Sentinel " + host + ":" + port
  39.  
    + " on channel +switch-master: " + message);
  40.  
    }
  41.  
    }
  42.  
    }, "+switch-master");
  43.  
     
  44.  
    } catch (JedisConnectionException e) {
  45.  
     
  46.  
    if (running.get()) {
  47.  
    log.info("Lost connection to Sentinel at " + host + ":" + port
  48.  
    + ". Sleeping 5000ms and retrying.", e);
  49.  
    try {
  50.  
    //如果连接哨兵异常,则等待若干时间后无限重试
  51.  
    Thread.sleep(subscribeRetryWaitTimeMillis);
  52.  
    } catch (InterruptedException e1) {
  53.  
    log.info( "Sleep interrupted: ", e1);
  54.  
    }
  55.  
    } else {
  56.  
    log.info("Unsubscribing from Sentinel at " + host + ":" + port);
  57.  
    }
  58.  
    } finally {
  59.  
    j.close();
  60.  
    }
  61.  
    }
  62.  
    }

由此我们知道,Redis的客户端,在哨兵模式下的实现,读写都是走Master,那么缺点是显而易见的,那就是若干个Slave完全变成了热备,没有系统分担压力,接下来我们扩展它,让它支持可以在Slave节点读取数据,这样我们的程序,在写入数据时走Master,在读取数据时走Slave,大大提高了系统的性能。

第一步,我们重写这个类 JedisSentinelSlavePool extends Pool<Jedis>

所有代码都拷贝JedisSentinelPool,只修改了下面代码,创建了JedisSlaveFactory,传入了哨兵集群信息,和哨兵的名字。

  1.  
    private void initPool(HostAndPort master) {
  2.  
    if (!master.equals(currentHostMaster)) {
  3.  
    currentHostMaster = master;
  4.  
    if (factory == null) {
  5.  
    factory = new JedisSlaveFactory(sentinels, masterName, connectionTimeout,
  6.  
    soTimeout, password, database, clientName, false, null, null, null);
  7.  
    initPool(poolConfig, factory);
  8.  
    } else {
  9.  
    internalPool.clear();
  10.  
    }
  11.  
     
  12.  
    log.info("Created JedisPool to master at " + master);
  13.  
    }
  14.  
    }

第二步,创建JedisSlaveFactory。

makeObject这个方法,是Redis连接池获取底层连接的地方,我么只需要在这里,创建一个连接到Slave节点的对象即可,

思路就是通过哨兵集群,获取到可用的slave节点信息,然后随机选取一个创建对象,达到负载均衡的效果。

  1.  
    package com.framework.core.redis;
  2.  
     
  3.  
    import lombok.extern.slf4j.Slf4j;
  4.  
    import org.apache.commons.pool2.PooledObject;
  5.  
    import org.apache.commons.pool2.PooledObjectFactory;
  6.  
    import org.apache.commons.pool2.impl.DefaultPooledObject;
  7.  
    import redis.clients.jedis.BinaryJedis;
  8.  
    import redis.clients.jedis.HostAndPort;
  9.  
    import redis.clients.jedis.Jedis;
  10.  
    import redis.clients.jedis.exceptions.JedisConnectionException;
  11.  
    import redis.clients.jedis.exceptions.JedisException;
  12.  
     
  13.  
    import javax.net.ssl.HostnameVerifier;
  14.  
    import javax.net.ssl.SSLParameters;
  15.  
    import javax.net.ssl.SSLSocketFactory;
  16.  
    import java.util.*;
  17.  
     
  18.  
    @Slf4j
  19.  
    public class JedisSlaveFactory implements PooledObjectFactory<Jedis> {
  20.  
     
  21.  
    private final Set<String> sentinels;
  22.  
    private final String masterName;
  23.  
    private final int connectionTimeout;
  24.  
    private final int soTimeout;
  25.  
    private final String password;
  26.  
    private final int database;
  27.  
    private final String clientName;
  28.  
    private final boolean ssl;
  29.  
    private final SSLSocketFactory sslSocketFactory;
  30.  
    private SSLParameters sslParameters;
  31.  
    private HostnameVerifier hostnameVerifier;
  32.  
    private Random random;
  33.  
     
  34.  
    public JedisSlaveFactory(final Set<String> sentinels, final String masterName, final int connectionTimeout,
  35.  
    final int soTimeout, final String password, final int database, final String clientName,
  36.  
    final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters,
  37.  
    final HostnameVerifier hostnameVerifier) {
  38.  
    this.sentinels = sentinels;
  39.  
    this.masterName = masterName;
  40.  
    this.connectionTimeout = connectionTimeout;
  41.  
    this.soTimeout = soTimeout;
  42.  
    this.password = password;
  43.  
    this.database = database;
  44.  
    this.clientName = clientName;
  45.  
    this.ssl = ssl;
  46.  
    this.sslSocketFactory = sslSocketFactory;
  47.  
    this.sslParameters = sslParameters;
  48.  
    this.hostnameVerifier = hostnameVerifier;
  49.  
    this.random = new Random();
  50.  
    }
  51.  
     
  52.  
    @Override
  53.  
    public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
  54.  
    final BinaryJedis jedis = pooledJedis.getObject();
  55.  
    if (jedis.getDB() != database) {
  56.  
    jedis.select(database);
  57.  
    }
  58.  
    }
  59.  
     
  60.  
    /**
  61.  
    * 销毁redis底层连接
  62.  
    */
  63.  
    @Override
  64.  
    public void destroyObject(PooledObject<Jedis> pooledJedis){
  65.  
    log.debug("destroyObject =" + pooledJedis.getObject());
  66.  
    final BinaryJedis jedis = pooledJedis.getObject();
  67.  
    if (jedis.isConnected()) {
  68.  
    try {
  69.  
    jedis.quit();
  70.  
    jedis.disconnect();
  71.  
    } catch (Exception e) {
  72.  
    }
  73.  
    }
  74.  
    }
  75.  
     
  76.  
    /**
  77.  
    * 创建Redis底层连接对象,返回池化对象.
  78.  
    */
  79.  
    @Override
  80.  
    public PooledObject<Jedis> makeObject() {
  81.  
    List<HostAndPort> slaves = this.getAlivedSlaves();
  82.  
     
  83.  
    //在slave节点中随机选取一个节点进行连接
  84.  
    int index = slaves.size() == 1 ? 0 : random.nextInt(slaves.size());
  85.  
    final HostAndPort hostAndPort = slaves.get(index);
  86.  
     
  87.  
    log.debug("Create jedis instance from slaves=[" + slaves + "] , choose=[" + hostAndPort + "]");
  88.  
     
  89.  
    //创建redis客户端
  90.  
    final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
  91.  
    soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
  92.  
     
  93.  
    //测试连接,设置密码,数据库.
  94.  
    try {
  95.  
    jedis.connect();
  96.  
    if (null != this.password) {
  97.  
    jedis.auth(this.password);
  98.  
    }
  99.  
    if (database != 0) {
  100.  
    jedis.select(database);
  101.  
    }
  102.  
    if (clientName != null) {
  103.  
    jedis.clientSetname(clientName);
  104.  
    }
  105.  
    } catch (JedisException je) {
  106.  
    jedis.close();
  107.  
    throw je;
  108.  
    }
  109.  
     
  110.  
    return new DefaultPooledObject<Jedis>(jedis);
  111.  
    }
  112.  
     
  113.  
     
  114.  
    /**
  115.  
    * 获取可用的RedisSlave节点信息
  116.  
    */
  117.  
    private List<HostAndPort> getAlivedSlaves() {
  118.  
    log.debug("Get alived salves start...");
  119.  
     
  120.  
    List<HostAndPort> alivedSalaves = new ArrayList<>();
  121.  
    boolean sentinelAvailable = false;
  122.  
     
  123.  
    //循环哨兵,建立连接获取slave节点信息
  124.  
    //当某个哨兵连接失败,会忽略异常连接下一个哨兵
  125.  
    for (String sentinel : sentinels) {
  126.  
    final HostAndPort hap = HostAndPort.parseString(sentinel);
  127.  
     
  128.  
    log.debug("Connecting to Sentinel " + hap);
  129.  
     
  130.  
    Jedis jedis = null;
  131.  
    try {
  132.  
    jedis = new Jedis(hap.getHost(), hap.getPort());
  133.  
     
  134.  
    List<Map<String, String>> slavesInfo = jedis.sentinelSlaves(masterName);
  135.  
     
  136.  
    //可以连接到哨兵
  137.  
    sentinelAvailable = true;
  138.  
     
  139.  
    //没有查询到slave信息,循环下一个哨兵
  140.  
    if (slavesInfo == null || slavesInfo.size() == 0) {
  141.  
    log.warn("Cannot get slavesInfo, master name: " + masterName + ". Sentinel: " + hap
  142.  
    + ". Trying next one.");
  143.  
    continue;
  144.  
    }
  145.  
     
  146.  
    //获取可用的Slave信息
  147.  
    for (Map<String, String> slave : slavesInfo) {
  148.  
    if(slave.get("flags").equals("slave")) {
  149.  
    String host = slave.get("ip");
  150.  
    int port = Integer.valueOf(slave.get("port"));
  151.  
    HostAndPort hostAndPort = new HostAndPort(host, port);
  152.  
     
  153.  
    log.info("Found alived redis slave:[" + hostAndPort + "]");
  154.  
     
  155.  
    alivedSalaves.add(hostAndPort);
  156.  
    }
  157.  
    }
  158.  
     
  159.  
    log.debug("Get alived salves end...");
  160.  
    break;
  161.  
    } catch (JedisException e) {
  162.  
    //当前哨兵连接失败,忽略错误连接下一个哨兵
  163.  
    log.warn("Cannot get slavesInfo from sentinel running @ " + hap + ". Reason: " + e
  164.  
    + ". Trying next one.");
  165.  
    } finally {
  166.  
    if (jedis != null) {
  167.  
    jedis.close();
  168.  
    }
  169.  
    }
  170.  
    }
  171.  
     
  172.  
    //没有可用的slave节点信息
  173.  
    if (alivedSalaves.isEmpty()) {
  174.  
    if (sentinelAvailable) {
  175.  
    throw new JedisException("Can connect to sentinel, but " + masterName
  176.  
    + " cannot find any redis slave");
  177.  
    } else {
  178.  
    throw new JedisConnectionException("All sentinels down");
  179.  
    }
  180.  
    }
  181.  
     
  182.  
    return alivedSalaves;
  183.  
    }
  184.  
     
  185.  
    @Override
  186.  
    public void passivateObject(PooledObject<Jedis> pooledJedis) {
  187.  
    }
  188.  
     
  189.  
    /**
  190.  
    * 检查jedis客户端是否有效
  191.  
    * @param pooledJedis 池中对象
  192.  
    * @return true有效 false无效
  193.  
    */
  194.  
    @Override
  195.  
    public boolean validateObject(PooledObject<Jedis> pooledJedis) {
  196.  
    final BinaryJedis jedis = pooledJedis.getObject();
  197.  
    try {
  198.  
    //是否TCP连接 && 是否ping通 && 是否slave角色
  199.  
    boolean result = jedis.isConnected()
  200.  
    && jedis.ping().equals("PONG")
  201.  
    && jedis.info("Replication").contains("role:slave");
  202.  
     
  203.  
    log.debug("ValidateObject Jedis=["+jedis+"] host=[ " + jedis.getClient().getHost() +
  204.  
    "] port=[" + jedis.getClient().getPort() +"] return=[" + result + "]");
  205.  
    return result;
  206.  
    } catch (final Exception e) {
  207.  
    log.warn("ValidateObject error jedis client cannot use", e);
  208.  
    return false;
  209.  
    }
  210.  
    }
  211.  
     
  212.  
    }

使用的时候跟原来一样,创建slave连接池就可以了。

  1.  
    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
  2.  
    //连接池中最大对象数量
  3.  
    jedisPoolConfig.setMaxTotal(100);
  4.  
    //最大能够保持idel状态的对象数
  5.  
    jedisPoolConfig.setMaxIdle(1);
  6.  
    //最小能够保持idel状态的对象数
  7.  
    jedisPoolConfig.setMinIdle(1);
  8.  
    //当池内没有可用资源,最大等待时长
  9.  
    jedisPoolConfig.setMaxWaitMillis(3000);
  10.  
    //表示有一个idle object evitor线程对object进行扫描,调用validateObject方法.
  11.  
    jedisPoolConfig.setTestWhileIdle(true);
  12.  
    //evitor线程对object进行扫描的时间间隔
  13.  
    jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30000);
  14.  
    //表示对象的空闲时间,如果超过这个时间对象没有被使用则变为idel状态
  15.  
    //然后才能被idle object evitor扫描并驱逐;
  16.  
    //这一项只有在timeBetweenEvictionRunsMillis大于0时和setTestWhileIdle=true时才有意义
  17.  
    //-1 表示对象不会变成idel状态
  18.  
    jedisPoolConfig.setMinEvictableIdleTimeMillis(60000);
  19.  
    //表示idle object evitor每次扫描的最多的对象数;
  20.  
    jedisPoolConfig.setNumTestsPerEvictionRun(10);
  21.  
     
  22.  
    //在从池中获取对象时调用validateObject方法检查
  23.  
    jedisPoolConfig.setTestOnBorrow(false);
  24.  
    //在把对象放回池中时调用validateObject方法检查
  25.  
    jedisPoolConfig.setTestOnReturn(false);
  26.  
     
  27.  
    Set<String> sentinels = new HashSet<>(Arrays.asList(
  28.  
    "192.168.80.112:26379",
  29.  
    "192.168.80.113:26379",
  30.  
    "192.168.80.114:26379"
  31.  
    ));
  32.  
     
  33.  
    JedisSentinelSlavePool pool = new JedisSentinelSlavePool("mymaster", sentinels, jedisPoolConfig);

与Spring集成,分别创建不同的对象即可,在程序中查询接口可以先走slave进行查询,查询不到在查询master, master也没有则写入缓存,返回数据,下载在查询slave就同步过去啦,这样一来redis的性能会大幅度的提升。

  1.  
    @Primary
  2.  
    @Bean(name = "redisTemplateMaster")
  3.  
    public RedisTemplate<Object, Object> redisTemplateMaster() {
  4.  
    RedisTemplate<Object, Object> template = new RedisTemplate<>();
  5.  
    template.setConnectionFactory(redisMasterConnectionFactory());
  6.  
    template.setKeySerializer(new StringRedisSerializer());
  7.  
    template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
  8.  
    return template;
  9.  
    }
  10.  
     
  11.  
    @Bean(name = "redisTemplateSlave")
  12.  
    public RedisTemplate<Object, Object> redisTemplateSlave() {
  13.  
    RedisTemplate<Object, Object> template = new RedisTemplate<>();
  14.  
    template.setConnectionFactory(redisSlaveConnectionFactory());
  15.  
    template.setKeySerializer(new StringRedisSerializer());
  16.  
    template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
  17.  
    return template;
  18.  
    }
上一篇:mongodb安装和使用


下一篇:C#用一行代码将string[] 转成 int[]