jedis哨兵模式的redis组(集群),连接池实现。(客户端分片)

java 连接redis 我们都使用的 是jedis  ,对于redis这种频繁请求的场景我们一般需要对其池化避免重复创建,即创建一个连接池 ,打开jedis的 jar包我们发现,jedis对池已经有了相关的 实现,根据pom 依赖可以清楚的知道 这是基于common-pool2连接池实现的。jedis的jar包中包含了三个连接池 JedisPool与JedisSentinelPool与ShardedJedisPool 。那么 jedis 为什么会包含三种实现方式呢 ?其实归根结底还是因为redis环境的 不同。单节点模式还是哨兵模式、还是集群共享模式

首先JedisPool  我们可以清晰的看见这是为单节点模式实现的连接池 :

  public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port,
int timeout, final String password, final int database) {
this(poolConfig, host, port, timeout, password, database, null);
}

再打开JedisSentinelPool,我们可以看到这是基于哨兵模式的单节点的 连接池实现:

  public JedisSentinelPool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final String password) {
this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password);
}

再打开ShardedJedisPool  ,我们可以看到这是基于多个节点的 ,基于客户端分片的 redis组的连接池,具体原理是基于hash将不同的key 分配到不同的节点上,即提高了吞吐量,也可以一定程度避免单机风险。

  public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards,
Hashing algo) {
this(poolConfig, shards, algo, null);
}

查看同时我们打开其节点信息JedisShardInfo,可以看见这是基于直连ip与端口的实现方式,生产环境一般不会直接提供这样的信息 。

public class JedisShardInfo extends ShardInfo<Jedis> {
public String toString() {
return host + ":" + port + "*" + getWeight();
} private int timeout;
private String host;
private int port;
private String password = null;
private String name = null; public String getHost() {
return host;
} public int getPort() {
return port;
} public JedisShardInfo(String host) {
super(Sharded.DEFAULT_WEIGHT);
URI uri = URI.create(host);
if (uri.getScheme() != null && uri.getScheme().equals("redis")) {
this.host = uri.getHost();
this.port = uri.getPort();
this.password = uri.getUserInfo().split(":", 2)[1];
} else {
this.host = host;
this.port = Protocol.DEFAULT_PORT;
}
}

在此,jedis链接池到上面就告一段落了 ,如果你的 环境信息与上面的一致,那么你可以放心的使用jedis连接池。

当然,ShardedJedisPool也会存在扩容的 问题详情见我的 下一篇文章。


来到我们的正文:

如果生产环境的redis是致力于高可用,我们一般是使用的哨兵模式,但是为避免单点风险,我们会创建多个redis服务使其形成一组redis集群(非集群模式),即是使哨兵模式,又是多节点的,那么你的链接池是否就应该是ShardedJedisSentinelPool了,这种JedisSentinelPool与ShardedJedisPool  构建的混合体。

那么其就应该是一下结构,以此来创建一个链接池:

    public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels, GenericObjectPoolConfig poolConfig, String password) {
this((Set)masters, (Set)sentinels, poolConfig, 2000, password);
}

在此基础上按照JedisSentinel实现哨兵的加载与ShardedJedis实现客户端分片。

示例代码如下:

public class ShardedJedisSentinelPoolExt extends Pool<ShardedJedis> {
private static final int MAX_RETRY_SENTINEL = 10;
private static final Logger logger = LoggerFactory.getLogger(ShardedJedisSentinelPoolExt.class);
private GenericObjectPoolConfig poolConfig;
private int timeout;
private int sentinelRetry;
private String password;
private Set<ShardedJedisSentinelPoolExt.MasterListener> masterListeners;
private volatile List<HostAndPort> currentHostMasters;
private String redisMasterName;
private static Pattern pattern = Pattern.compile("^[A-Z0-9_]{5,200}\\$[0-9]{2,4}-[0-9]{2,4}([A-Z0-9_]{5,200})?"); public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels) {
this(masters, sentinels, new GenericObjectPoolConfig(), 2000, (String)null, 0);
} public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels, String password) {
this((Set)masters, (Set)sentinels, new GenericObjectPoolConfig(), 2000, password);
} public ShardedJedisSentinelPoolExt(GenericObjectPoolConfig poolConfig, Set<String> masters, Set<String> sentinels) {
this(masters, sentinels, poolConfig, 2000, (String)null, 0);
} public ShardedJedisSentinelPoolExt(String mastersStr, String sentinelsStr, GenericObjectPoolConfig poolConfig, int timeout, String password) {
this.timeout = 2000;
this.sentinelRetry = 0;
this.masterListeners = new HashSet();
String[] splitMasters = mastersStr.split(",");
String[] splitSentinels = sentinelsStr.split(",");
List<String> masters = new ArrayList();
String[] var9 = splitMasters;
int var10 = splitMasters.length; for(int var11 = 0; var11 < var10; ++var11) {
String splitMaster = var9[var11];
List<String> stringList = convertToMatch(splitMaster);
if (stringList != null) {
masters.addAll(stringList);
}
} Set<String> setSentinels = new HashSet(Arrays.asList(splitSentinels));
this.poolConfig = poolConfig;
this.timeout = timeout;
this.password = password;
this.redisMasterName = mastersStr;
List<HostAndPort> masterList = this.initSentinels(setSentinels, masters);
this.initPool(masterList);
} public ShardedJedisSentinelPoolExt(String mastersStr, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password) {
this.timeout = 2000;
this.sentinelRetry = 0;
this.masterListeners = new HashSet();
String[] splitMasters = mastersStr.split(",");
Set<String> masters = new HashSet();
String[] var8 = splitMasters;
int var9 = splitMasters.length; for(int var10 = 0; var10 < var9; ++var10) {
String splitMaster = var8[var10];
List<String> stringList = convertToMatch(splitMaster);
if (stringList != null) {
masters.addAll(stringList);
}
} this.poolConfig = poolConfig;
this.timeout = timeout;
this.password = password;
this.redisMasterName = mastersStr;
List<String> masterStrList = new ArrayList(masters);
Collections.sort(masterStrList);
List<HostAndPort> masterList = this.initSentinels(sentinels, masterStrList);
this.initPool(masterList);
} public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password) {
this(masters, sentinels, poolConfig, timeout, password, 0);
} public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout) {
this(masters, sentinels, poolConfig, timeout, (String)null, 0);
} public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels, GenericObjectPoolConfig poolConfig, String password) {
this((Set)masters, (Set)sentinels, poolConfig, 2000, password);
} public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password, int database) {
this.timeout = 2000;
this.sentinelRetry = 0;
this.masterListeners = new HashSet();
this.poolConfig = poolConfig;
this.timeout = timeout;
this.password = password;
List<String> masterStrList = new ArrayList(masters);
Collections.sort(masterStrList);
List<HostAndPort> masterList = this.initSentinels(sentinels, masterStrList);
this.initPool(masterList);
} public void destroy() {
Iterator var1 = this.masterListeners.iterator(); while(var1.hasNext()) {
ShardedJedisSentinelPoolExt.MasterListener m = (ShardedJedisSentinelPoolExt.MasterListener)var1.next();
m.shutdown();
} super.destroy();
} public List<HostAndPort> getCurrentHostMaster() {
return this.currentHostMasters;
} private void initPool(List<HostAndPort> masters) {
if (!masterEquals(this.currentHostMasters, masters)) {
StringBuilder sb = new StringBuilder();
Iterator var3 = masters.iterator(); while(var3.hasNext()) {
HostAndPort master = (HostAndPort)var3.next();
sb.append(master.toString());
sb.append(" ");
} logger.info("Created ShardedJedisPool to master at [" + sb.toString() + "]");
List<JedisShardInfo> shardMasters = this.makeShardInfoList(masters);
this.initPool(this.poolConfig, new ShardedJedisSentinelPoolExt.ShardedJedisFactory(shardMasters, Hashing.MURMUR_HASH, (Pattern)null));
this.currentHostMasters = masters;
} } private static boolean masterEquals(List<HostAndPort> currentShardMasters, List<HostAndPort> shardMasters) {
if (currentShardMasters != null && shardMasters != null && currentShardMasters.size() == shardMasters.size()) {
for(int i = 0; i < currentShardMasters.size(); ++i) {
if (!((HostAndPort)currentShardMasters.get(i)).equals(shardMasters.get(i))) {
return false;
}
} return true;
} else {
return false;
}
} private List<JedisShardInfo> makeShardInfoList(List<HostAndPort> masters) {
List<JedisShardInfo> shardMasters = new ArrayList();
Iterator var3 = masters.iterator(); while(var3.hasNext()) {
HostAndPort master = (HostAndPort)var3.next();
JedisShardInfo jedisShardInfo = new JedisShardInfo(master.getHost(), master.getPort(), this.timeout);
jedisShardInfo.setPassword(this.password);
shardMasters.add(jedisShardInfo);
} return shardMasters;
} private List<HostAndPort> initSentinels(Set<String> sentinels, List<String> masters) {
Map<String, HostAndPort> masterMap = new HashMap();
List<HostAndPort> shardMasters = new ArrayList();
logger.info("Trying to find all master:" + masters + "from available Sentinels:" + sentinels);
Iterator var5 = masters.iterator(); boolean fetched;
do {
String masterName;
if (!var5.hasNext()) {
if (!masters.isEmpty() && masters.size() == shardMasters.size()) {
var5 = sentinels.iterator(); while(var5.hasNext()) {
masterName = (String)var5.next();
String threadName = "master-listener-sentinel-" + masterName;
logger.info("Starting Sentinel listeners thread " + masterName);
HostAndPort hap = toHostAndPort(Arrays.asList(masterName.split(":")));
ShardedJedisSentinelPoolExt.MasterListener masterListener = new ShardedJedisSentinelPoolExt.MasterListener(masters, hap.getHost(), hap.getPort(), threadName);
this.masterListeners.add(masterListener);
masterListener.start();
}
} return shardMasters;
} masterName = (String)var5.next();
HostAndPort master = null;
fetched = false; while(!fetched && this.sentinelRetry < 10) {
Iterator var9 = sentinels.iterator(); while(var9.hasNext()) {
String sentinel = (String)var9.next();
HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
logger.info("Connecting to Sentinel " + hap); try {
Jedis jedis = new Jedis(hap.getHost(), hap.getPort());
Throwable var13 = null; try {
master = (HostAndPort)masterMap.get(masterName);
if (master == null) {
List<String> hostAndPort = jedis.sentinelGetMasterAddrByName(masterName);
if (hostAndPort != null && !hostAndPort.isEmpty()) {
master = toHostAndPort(hostAndPort);
logger.info("Found Redis master " + masterName + " at " + master);
shardMasters.add(master);
masterMap.put(masterName, master);
fetched = true;
jedis.disconnect();
break;
}
}
} catch (Throwable var27) {
var13 = var27;
throw var27;
} finally {
if (jedis != null) {
if (var13 != null) {
try {
jedis.close();
} catch (Throwable var26) {
var13.addSuppressed(var26);
}
} else {
jedis.close();
}
} }
} catch (JedisException var29) {
logger.error("Cannot connect to sentinel :{} ,Trying next one sentinel , errorMsg:{}", new Object[]{hap, var29.getMessage(), var29});
}
} if (null == master) {
try {
logger.warn("All sentinels down, cannot determine where is " + masterName + " master is running... sleeping 1000ms, Will try again.");
Thread.sleep(1000L);
} catch (InterruptedException var25) {
logger.error(var25.getMessage(), var25);
} fetched = false;
++this.sentinelRetry;
}
}
} while(fetched); logger.error("All sentinels down and try 10 times, Abort.");
throw new JedisConnectionException("Cannot connect all sentinels, Abort.");
} private static HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
String host = (String)getMasterAddrByNameResult.get(0);
int port = Integer.parseInt((String)getMasterAddrByNameResult.get(1));
return new HostAndPort(host, port);
} public static List<String> convertToMatch(String str) {
List<String> stringList = new ArrayList();
Matcher matcher = pattern.matcher(str);
if (!matcher.matches()) {
stringList.add(str);
return stringList;
} else {
String prefix = str.substring(0, str.indexOf("$"));
String suffix = str.substring(str.indexOf("$") + 1);
String[] split = suffix.split("-");
if (split.length != 2) {
return Collections.emptyList();
} else {
String startStr = split[0];
boolean isBeginWithZero = startStr.indexOf("0") == 0;
Integer beginNumber = Integer.valueOf(startStr);
String endStr = split[1];
int endStrLength = endStr.length();
int lastNumberIndex = 0; for(int i = 0; i < endStrLength; ++i) {
char c = endStr.charAt(i);
if (!Character.isDigit(c)) {
lastNumberIndex = i;
break;
}
} String endSuffix = "";
if (lastNumberIndex > 0) {
endSuffix = endStr.substring(lastNumberIndex);
endStr = endStr.substring(0, lastNumberIndex);
} for(Integer endNumber = Integer.valueOf(endStr); beginNumber <= endNumber; beginNumber = beginNumber + 1) {
StringBuilder stringBuilder = new StringBuilder(prefix);
if (isBeginWithZero && beginNumber < 10) {
stringBuilder.append("0");
} stringBuilder.append(beginNumber);
stringBuilder.append(endSuffix);
stringList.add(stringBuilder.toString());
} return stringList;
}
}
} public String getRedisMasterName() {
return this.redisMasterName;
} public void setRedisMasterName(String redisMasterName) {
this.redisMasterName = redisMasterName;
} public GenericObjectPoolConfig getPoolConfig() {
return this.poolConfig;
} public void setPoolConfig(GenericObjectPoolConfig poolConfig) {
this.poolConfig = poolConfig;
} protected class MasterListener extends Thread {
protected List<String> masters;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis;
protected Jedis jedis;
protected AtomicBoolean running; public MasterListener(List<String> masters, String host, int port) {
this.subscribeRetryWaitTimeMillis = 5000L;
this.running = new AtomicBoolean(false);
this.masters = masters;
this.host = host;
this.port = port;
} public MasterListener(List<String> masters, String host, int port, String threadName) {
super(threadName);
this.subscribeRetryWaitTimeMillis = 5000L;
this.running = new AtomicBoolean(false);
this.masters = masters;
this.host = host;
this.port = port;
} public MasterListener(List<String> masters, String host, int port, long subscribeRetryWaitTimeMillis) {
this(masters, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
} public void run() {
this.running.set(true);
if (this.running.get()) {
do {
this.jedis = new Jedis(this.host, this.port); try {
this.jedis.subscribe(new ShardedJedisSentinelPoolExt.JedisPubSubAdapter() {
public void onMessage(String channel, String message) {
ShardedJedisSentinelPoolExt.logger.info("Sentinel " + MasterListener.this.host + ":" + MasterListener.this.port + " published: " + message + ".");
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
int index = MasterListener.this.masters.indexOf(switchMasterMsg[0]);
if (index >= 0) {
HostAndPort newHostMaster = ShardedJedisSentinelPoolExt.toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]));
List<HostAndPort> newHostMasters = new ArrayList(); for(int i = 0; i < MasterListener.this.masters.size(); ++i) {
newHostMasters.add((Object)null);
} Collections.copy(newHostMasters, ShardedJedisSentinelPoolExt.this.currentHostMasters);
newHostMasters.set(index, newHostMaster);
ShardedJedisSentinelPoolExt.this.initPool(newHostMasters);
} else {
StringBuilder sb = new StringBuilder();
Iterator var9 = MasterListener.this.masters.iterator(); while(var9.hasNext()) {
String masterName = (String)var9.next();
sb.append(masterName);
sb.append(",");
} ShardedJedisSentinelPoolExt.logger.info("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our monitor master name are [" + sb + "]");
}
} else {
ShardedJedisSentinelPoolExt.logger.warn("Invalid message received on Sentinel " + MasterListener.this.host + ":" + MasterListener.this.port + " on channel +switch-master: " + message);
} }
}, new String[]{"+switch-master"});
} catch (JedisConnectionException var4) {
ShardedJedisSentinelPoolExt.logger.error("Lost connection to Sentinel at {}:{},{}", new Object[]{this.host, this.port, var4.getMessage(), var4});
if (this.running.get()) {
ShardedJedisSentinelPoolExt.logger.warn("Lost connection to Sentinel at " + this.host + ":" + this.port + ". Sleeping 5000ms and retrying."); try {
Thread.sleep(this.subscribeRetryWaitTimeMillis);
} catch (InterruptedException var3) {
ShardedJedisSentinelPoolExt.logger.error(var3.getMessage(), var3);
}
} else {
ShardedJedisSentinelPoolExt.logger.error("Unsubscribing from Sentinel at " + this.host + ":" + this.port);
}
}
} while(this.running.get());
} } public void shutdown() {
try {
ShardedJedisSentinelPoolExt.logger.info("Shutting down listener on " + this.host + ":" + this.port);
this.running.set(false);
this.jedis.disconnect();
} catch (Exception var2) {
ShardedJedisSentinelPoolExt.logger.error("Caught exception while shutting down: " + var2.getMessage());
throw new CcspRuntimeException(var2);
}
}
} protected class JedisPubSubAdapter extends JedisPubSub {
protected JedisPubSubAdapter() {
}
} protected static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {
private List<JedisShardInfo> shards;
private Hashing algo;
private Pattern keyTagPattern; public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
this.shards = shards;
this.algo = algo;
this.keyTagPattern = keyTagPattern;
} public PooledObject<ShardedJedis> makeObject() throws Exception {
ShardedJedis jedis = new ShardedJedis(this.shards, this.algo, this.keyTagPattern);
return new DefaultPooledObject(jedis);
} public void destroyObject(PooledObject<ShardedJedis> pooledShardedJedis) throws Exception {
ShardedJedis shardedJedis = (ShardedJedis)pooledShardedJedis.getObject();
Iterator var3 = shardedJedis.getAllShards().iterator(); while(var3.hasNext()) {
Jedis jedis = (Jedis)var3.next(); try {
jedis.quit();
} catch (Exception var7) {
ShardedJedisSentinelPoolExt.logger.error(var7.getMessage(), var7);
} try {
jedis.disconnect();
} catch (Exception var6) {
ShardedJedisSentinelPoolExt.logger.error(var6.getMessage(), var6);
}
} } public boolean validateObject(PooledObject<ShardedJedis> pooledShardedJedis) {
try {
ShardedJedis jedis = (ShardedJedis)pooledShardedJedis.getObject();
Iterator var3 = jedis.getAllShards().iterator(); Jedis shard;
do {
if (!var3.hasNext()) {
return true;
} shard = (Jedis)var3.next();
} while("PONG".equals(shard.ping())); return false;
} catch (Exception var5) {
ShardedJedisSentinelPoolExt.logger.error(var5.getMessage(), var5);
return false;
}
} public void activateObject(PooledObject<ShardedJedis> p) throws Exception {
} public void passivateObject(PooledObject<ShardedJedis> p) throws Exception {
}
}
}
上一篇:node.js学习资料


下一篇:linux查看硬件信息及驱动设备相关整理