1. 概述
项目中,创建的活动内容存入redis,然后需要用到活动内容的地方,从redis去取,然后参与计算。
活动数据的一个特点是更新不频繁、数据量不大。因为项目部署一般是多机器、多实例,除了redis,有没有其他实现呢?
Guava的 loading cache是本地缓存,数据量不是很大时 可以适用(如果有大量的key-value数据缓存本地,本机也吃不消啊),
然后多机器多实例怎么同步呢?想到了zookeeper....
2. 代码
2.1 模拟多实例
package TestZK; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry; /**
* @Author: rocky
* @Date: Created in 2018/5/20.
*/
public class InstanceOne {
private static final String ADDRESS = "xxx:2181";
private static final String PATH = "/strategy";
private static CuratorFramework client;
static {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(ADDRESS)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
// .namespace(BASE)
.build();
client.start();
} public static void main(String[] args) throws Exception {
startCache();
Thread.sleep(Integer.MAX_VALUE);
} private static void startCache() throws Exception {
PathChildrenCache childrenCache = new PathChildrenCache(client, PATH, true);
childrenCache.start();
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
System.out.println("catch that: the path of changed node "+ event.getData().getPath()
+ ", the data of changed node is " + new String(event.getData().getData()));
//load data to loading cache(guava)
doSomething();
}
});
}
//load data to loading cache
private static void doSomething() {
// LoadingCache<String, Map<Integer, DriverIntegral>> driverIntegralCache = CacheBuilder.newBuilder().maximumSize(500)
// .expireAfterWrite(5, TimeUnit.MINUTES).build(new CacheLoader<String, Map<Integer, DriverIntegral>>() {
// @Override
// public Map<Integer, DriverIntegral> load(String key) throws Exception {
// Map<Integer, DriverIntegral> integerDriverIntegralMap = ..;
// logger.info("guava load ::driverIntegralMap"+integerDriverIntegralMap!=null?integerDriverIntegralMap.toString():"");
// return integerDriverIntegralMap;
// }
// });
//
// Map<Integer, DriverIntegral> driverIntegralMap = driverIntegralCache.get(INTEGRAL);
}
} package TestZK; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry; /**
* @Author: rocky
* @Date: Created in 2018/5/20.
*/
public class InstanceTwo {
private static final String ADDRESS = "xxx:2181";
private static final String PATH = "/strategy";
private static CuratorFramework client;
static {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(ADDRESS)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
// .namespace(BASE)
.build();
client.start();
} public static void main(String[] args) throws Exception {
startCache();
Thread.sleep(Integer.MAX_VALUE);
} private static void startCache() throws Exception {
PathChildrenCache childrenCache = new PathChildrenCache(client, PATH, true);
childrenCache.start();
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
System.out.println("catch that: the path of changed node "+ event.getData().getPath()
+ ", the data of changed node is " + new String(event.getData().getData()));
}
});
}
} package TestZK; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry; /**
* @Author: rocky
* @Date: Created in 2018/5/20.
*/
public class InstanceThree {
private static final String ADDRESS = "xxx:2181";
private static final String PATH = "/strategy";
private static CuratorFramework client;
static {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(ADDRESS)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
// .namespace(BASE)
.build();
client.start();
} public static void main(String[] args) throws Exception {
startCache();
Thread.sleep(Integer.MAX_VALUE);
} private static void startCache() throws Exception {
PathChildrenCache childrenCache = new PathChildrenCache(client, PATH, true);
childrenCache.start();
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
System.out.println("catch that: the path of changed node is "+ event.getData().getPath()
+ ", the data of changed node is " + new String(event.getData().getData()));
}
});
}
}
监听zk孩子节点,有变化时(创建、更新、删除),重新从DB加载活动数据,缓存到loading cache,用到活动数据的地方,从cache中取。
2.2 模拟客户端
package TestZK; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry; /**
* @Author: rocky
* @Date: Created in 2018/5/20.
*/
public class NodeChangeTest {
private static final String ADDRESS = "xxx:2181";
private static final String PATH = "/strategy";
private static CuratorFramework client;
static {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(ADDRESS)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
// .namespace(BASE)
.build();
client.start();
} public static void main(String[] args) throws Exception {
String path_node_1 = PATH + "/node_1";
//create
client.create().creatingParentsIfNeeded().forPath(path_node_1);
Thread.sleep(2000);
//change
client.setData().forPath(path_node_1, "yahahaha".getBytes());
Thread.sleep(2000);
//delete
client.delete().forPath(path_node_1);
Thread.sleep(Integer.MAX_VALUE);
}
}
客户端实际上可能是某个后台,在后台创建、修改、或删除某活动,然后对应操作zookeeper节点,其他监听到节点变化的地方,做相应操作(如查询DB并缓存数据)
控制台
3. 说明
Zookeeper原生的API只能实现一次监听,这里用到的Curator的封装jar包,免去了手动重复注册。另外Curator的NodeCache和TreeCache分别监听本节点及分支所有节点,
该实例演示的PathChildrenCache只能监控一级子节点(即儿子节点、孙子节点也不能监控),根据需要选择相应NodeCache.
很low的实现,请多指教^_^
Zookeeper Curator API 使用 www.cnblogs.com/rocky-fang/p/9037509.html
Zookeeper JAVA API的使用 http://www.cnblogs.com/rocky-fang/p/9030438.html