分布式协调-Zookeeper使用(Watcher、Session、Curator、Acl)
前面说到zk可以为shardingSphere当做动态配置的一个中间件,然后聊了一下zk的大体介绍,本篇咱们聊聊他的一些常见的特性,并且对其进行相关阐释,同时使用Curator作为Demo。本篇会聊到:
- 【State】:zk上每个节点除了存储了节点数据,同时也存储了一些节点的状态信息。我们会分析一下。
- 【Watcher(发布订阅)】:在shardSphere使用zk作为分布式配置的那一篇,我们在zk上手动修改了配置,然后ShardingSphere就可以感知到,其底层就是基于这个特性来做的。
- 【Session】:客户端和session建立连接的流程。
- 【Acl】(权限控制):因为不是谁都有权限对zk上的node进行操作的,一旦操作不当,系统就可能宕机。
- 【Curator】:对zk的api封装的一个客户端框架。
Stat(每个节点的状态和信息)
czxid 表示该数据节点被创建时的事务ID mzxid 表示该节点最后一次被更新时的事务ID ctime 表示节点被创建的时间 mtime 表示该节点最后一次被更新的时间 version 数据节点的版本号,这里其实是一种乐观锁。我们每次修改一个节点数据的时候,节点的version就会增加。那每个客户端在修改节点的时候,带一个version,当他们传递的version和当前version不一致的时候,就修改失败。 cversion 子结点的版本号 aversion 节点的ACL版本号 ephemeralOwner 创建该临时节点的会话的sessionID。如果该节点是持久节点,那么这个属性值为0 dataLength 数据内容的长度 numChildren 当前节点的子节点个数 pzxid 表示该节点的子节点里最后一次被修改时的事务ID。 【version 】:像上面解释的一样,现在我们的版本seq的版本是2,我们修改时带上版本是1就无法修改。
Watcher
我们的shardingSphere使用zk作为配置中心,当在zk上修改了配置后,shardingSphere就能感知到,就是通过watcher做的,zk实际上是和我们的客户端建立连接,并且主动通知客户端有数据修改了。我们这里举个例子。
对某个节点建立一个监听:所有命令带w的都是可以进行监听的
比如:我们在get的时候对某个节点进行监听,那么当其他客户端对我们get的这个数据进行操作的时候,我们对这个节点监听的节点就会收到消息。
现在我们使用客户端1对seq这个节点get的时候进行监听
然后使用客户端2对这个节点的值进行修改
这个时候节点一就能收到被监听节点的修改信息
问题是当我们再次这个数据进行修改的时候,修改的信息并没有被监听到,这也就是说,这种方式只是一次性监听。那如何进行每次修改都被监听到呢?
- 循环监听:我们看到,当收到信息时候,我们是可以知道哪个节点被修改了,那我们就可以拿到这个节点再次进行监听。
- addwatch:我们发现它命令中有一个addwatch,这个就是实现持续监听的方法。
持久化监听里面提供了两种方式:
【PERSISTENT】:持久化订阅,针对当前节点的修改和删除事件,以及当前节点的子节点的删除和新增事件 【PERSISTENT_RECURSIVE】:持久化递归订阅,在PERSISTENT的基础上,增加了子节点修改的事件触发,以及子节点的子节点的数据变化都会触发相关事件(满足递归订阅特性) 那了解了这个特性就知道,我们的shardingSphere指定是对rules(存储他的配置文件的及节点)进行了addwather的监听,这样当我们修改了rules的数据,监听这个节点的那些个shardingSphere就收到了信息啦。
Session(当客户端连接zkServer的时候,是一个异步的状态.)
- 客户端向Zookeeper Server发起连接请求,此时状态为CONNECTING
- 当连接建立好之后,Session状态转化为CONNECTED,此时可以进行数据的IO操作
- 如果Client和Server的连接出现丢失,则Client又会变成CONNECTING状态
- 如果会话过期或者主动关闭连接时,此时连接状态为CLOSE
- 如果是身份验证失败,直接结束
Curator(使用java操作zk)
pom
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.0</version> </dependency>View Code以下的代码,就是对zk进行操作的一个案例,包含了增删改查。curator提供了两种方式,同步和异步的操作。
【增删改查同步操作】
// 启动以及连接zk private CuratorOperationExample(){ curatorFramework= CuratorFrameworkFactory .builder() .connectionTimeoutMs(20000) .connectString("192.168.43.3:2181") //读写分离(zookeeper-server) .retryPolicy(new ExponentialBackoffRetry(1000,3)) .sessionTimeoutMs(15000) .build(); curatorFramework.start(); //启动 } // 对节点进行操作 private void nodeCRUD() throws Exception { System.out.println("开始针对节点的CRUD操作"); //这个要给节点中存储的数据 String value="Hello World"; //创建一个节点 String node=curatorFramework.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/node",value.getBytes()); System.out.println("节点创建成功:"+node); //存储状态信息的对象 Stat stat=new Stat(); //获取节点的value byte[] data=curatorFramework.getData().storingStatIn(stat).forPath(node); System.out.println("节点value值:"+new String(data)); //这里使用先查询,然后修改,因为可能有多个客户端同时连接。可能存在锁的问题。这里查询出来version然后再进行操作 stat=curatorFramework.setData() .withVersion(stat.getVersion()) .forPath(node,"Update Date Result".getBytes()); String result=new String(curatorFramework.getData().forPath(node)); System.out.println("修改节点之后的数据:"+result); System.out.println("开始删除节点"); curatorFramework.delete().forPath(node); Stat existStat=curatorFramework.checkExists().forPath(node); if(existStat==null){ System.out.println("节点删除成功"); } }View Code【异步增操作】
// 创建一个节点的异步方式,其他的操作都有对应的api, // 这里的CountDownLatch是为了不让线程直接跑下去, // 要是直接跑下去的话,就看不到创建的节点了,因为是异步的,而当执行到获取节点的时候,可能还没有创建好,只是为了看见这个节点内容而已,没有任何作用。 public void asyncCRUD() throws Exception { CountDownLatch countDownLatch=new CountDownLatch(1); // ZK会回调BackgroundCallback里面的方法进行回调 String node=curatorFramework.create().withMode(CreateMode.PERSISTENT) .inBackground((session,event)->{ System.out.println(Thread.currentThread().getName()+":执行创建节点:"+event.getPath()); countDownLatch.countDown(); //触发回调,递减计数器 }).forPath("/async-node"); countDownLatch.await(); }View Code【Acl操作:digest】digest为例 因为他是对于每次会话的授权,那我们在建立连接的时候就要加上,【.authorization("digest","glen:glen".getBytes())】
//创酱一个acl节点 //scheme 是digest id是glen:glen 权限是ala private void aclOperation() throws Exception { Id id=new Id("digest", DigestAuthenticationProvider.generateDigest("glen:glen")); List<ACL> acls=new ArrayList<>(); acls.add(new ACL(ZooDefs.Perms.ALL,id)); String node=curatorFramework.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(acls,false).forPath("/curator-auth","Auth".getBytes()); System.out.println("创建带有权限节点:"+node); System.out.println("数据查询结果:"+new String(curatorFramework.getData().forPath(node))); }View Code【一次性监听】
// 一次性监听 public void normalWatcher() throws Exception { CuratorWatcher curatorWatcher=new CuratorWatcher() { @Override public void process(WatchedEvent watchedEvent) throws Exception { System.out.println("监听到的事件"+watchedEvent.toString()); //循环设置监听 curatorFramework.checkExists().usingWatcher(this).forPath(watchedEvent.getPath()); } }; // 创酱一个节点 String node=curatorFramework.create().forPath("/watcher","Watcher String".getBytes()); System.out.println("节点创建成功:"+node); //设置一次普通的watcher监听 String data=new String(curatorFramework.getData().usingWatcher(curatorWatcher).forPath(node)); System.out.println("设置监听并获取节点数据:"+data); //第一次操作才会触发监听,而第二次不会。所以上面在回调方法中我们设置了一个循环监听。 curatorFramework.setData().forPath(node,"change data 0".getBytes()); Thread.sleep(1000); curatorFramework.setData().forPath(node,"change data 1".getBytes()); }View Code【持续化监听】
//持久化监听 //node :要监听的节点名称 private void persisWatcher(String node){ CuratorCache curatorCache=CuratorCache. //实例;去监听的节点;操作类型(单节点缓存,对数据进行压缩,关闭或不清理缓存) build(curatorFramework,node, CuratorCache.Options.SINGLE_NODE_CACHE); //这里可以设置对于事件的监听类型 CuratorCacheListener listener=CuratorCacheListener .builder() //这里是我们自己写的一个类,他会调用我们类的方法,这个实现了他的CuratorCacheListener接口 .forAll(new ZookeeperWatcherListener()) .build(); //把事件监听添加进去 curatorCache.listenable().addListener(listener); curatorCache.start(); }View Code【测试】
//这里对节点进行操作,看看是不是会别检测到事件 private void operation(String node) throws Exception { curatorFramework.create().forPath(node); curatorFramework.setData().forPath(node,"hello".getBytes()); curatorFramework.delete().forPath(node); } public static void main(String[] args) throws Exception { ZookeeperWatchExample zookeeperWatchExample=new ZookeeperWatchExample(); String node="/persis-node"; zookeeperWatchExample.persisWatcher(node); zookeeperWatchExample.operation(node); //让main方法等待 System.in.read(); }View Code
ACL权限控制
zk针对节点提供了权限的控制,这是因为要规避有人不小心删除了某个节点,而导致整个系统出现问题的情况。他和linux的权限控制相似。
他的权限标志符是这样的:【scheme:id:perm】
比如我们获取seq节点,他的scheme就是world(全部都能访问),id就是所有人,permission就是增、删、改、查、管理
- Scheme(权限模式),标识授权策略,即表示通过什么样子的方式去控制权限。
- 【world】:默认方式,相当于全部都能访问。
- 【auth】:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
- 【digest】:即用户名:密码这种方式认证
- 【ip】:通过ip地址来做权限控制。
- ID(授权对象):比如说我们的scheme是ip,那这里就填写ip,如果是digest,那就填写用户名和密码
- Permission:授予的权限 (c) create . (d)delete (r)read (w)write (a)admin
【world】通过get和set【acl】命令去修改一个节点的权限,一个world的例子。
【auth】 对登录用户进行授权,授权后可以操作授权的节点,而退出客户端后,再次进入就需要再次授权了。前面的glen是用户名 后面的glen是密码
当退出后,我们就无法操作atuh这个节点,必须再次进行授权了。