Zookeeper curator框架

1. 简介

  • curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。
  • 提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、 共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。

1.1 原生zookeeperAPI的不足

  • 连接对象异步创建,需要开发人员自行编码等待
  • 连接没有自动重连超时机制
  • watcher一次注册生效一次
  • 不支持递归创建树形节点

1.2 curator特点

  • 解决session会话超时重连
  • watcher反复注册
  • 简化开发api
  • 遵循Fluent风格的API
  • 提供了分布式锁服务、共享计数器、缓存机制等机制

1.3 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.12.0</version>
    </dependency>
    <!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
        <type>jar</type>
    </dependency>
</dependencies>

2. 连接与关闭

  • 采用了工厂设计模式和建造者设计模式。通过输入一些连接信息,可以获取到一个连接Zookeeper服务器的客户端。
      public static void main(String[] args) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);  // 表示间隔1s,最多尝试重连3次
        CuratorFramework client = CuratorFrameworkFactory
                                              .builder()
                                              .connectString("192.168.233.133:2181,192.168.233.131:2181,192.168.233.132:2181")
                                              .sessionTimeoutMs(5000)
                                              .retryPolicy(retryPolicy)
                                              .namespace("create")
                                              .build();
        client.start();  // 开启客户端
        log.info(client.isStarted());
        client.close();  // 关闭客户端
      }
    

    connectString:用于设置地址及端口号;
    sessionTimeoutMs:用于设置超时时间;
    retryPolicy:用于设置重连策略
    namespace:表示根节点路径,可以没有

2.1 测试模版

  • 因此,可以写一个测试模板,在开始之前打开客户端,在结束之后关闭客户端。
    public class CreateTest {
      private final static Logger log = Logger.getLogger(ConnectTest.class);
      private String connectString = "192.168.233.133:2181,192.168.233.131:2181,192.168.233.132:2181";
      CuratorFramework client;
      Integer sessionTimeoutMs = 5000;
      Integer baseSleepTimeMs = 1000;
      Integer maxRetries = 3;
      String namespace = "create";
    
    
      @Before
      public void before() {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory
                .builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .retryPolicy(retryPolicy)
                .namespace(namespace)
                .build();
        client.start();
        log.info("客户端已开启");
      }
    	
      @After
      public void after() {
        client.close();
        log.info("客户端已关闭");
      }
    }
    

3. 新增节点

3.1 案例一:简单创建

  @Test
  public void testCreate() throws Exception {
    client.create()
            .withMode(CreateMode.PERSISTENT)
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
            .forPath("/node", "data".getBytes());
    log.info("create结束");
  }

3.2 案例二:自定义权限创建

  @Test
  public void testCreate2() throws Exception {
    Id ip = new Id("ip", "192.168.233.133");
    List<ACL> acl = Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip));
    client.create()
            .withMode(CreateMode.PERSISTENT)
            .withACL(acl)
            .forPath("/node1", "data".getBytes());
    log.info("create结束");
  }

3.3 案例三:递归创建节点

  • .creatingParentsIfNeeded()实现,可以递归创建节点
      @Test
      public void testCreate3() throws Exception {
        //  递归创建节点
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/node2/node33", "data".getBytes());
        log.info("create结束");
      }
    

3.4 案例四:异步方法创建节点

  • 在此说明一下,方法接收到的第一个参数curatorFramework实际上就是客户端;curatorFramework保存了一些查询的结果。
      @Test
      public void testCreate4() throws Exception {
        //  异步方式创建节点
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .inBackground(new BackgroundCallback() {
                  @Override
                  public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                    log.info(curatorFramework == client);  // true
                    log.info("getResultCode(): " + curatorEvent.getResultCode());  // 0表示创建成功
                    log.info("getType(): " + curatorEvent.getType().toString());  // 获取操作类型 CREATE
                    log.info("getPath(): " + curatorEvent.getPath());   // 获取节点路径
                  }
                })
                .forPath("/node2/node38", "data".getBytes());
        log.info("create结束");
      }
    

4. 更新节点

4.1 案例一:更新一个节点

  @Test
  public void testSet() throws Exception {
    client.setData()
            .forPath("/node", "set".getBytes());
    log.info("设置完成");
  }

4.2 案例二:带版本更新一个节点

  @Test
  public void testSet2() throws Exception {
    client.setData()
            .withVersion(1)  // 带有版本号
            .forPath("/node", "12".getBytes());
    log.info("设置完成");
  }

4.3 案例三:带回调方法更新一个节点

  @Test
  public void testSet3() throws Exception {
    client.setData()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getResultCode());  // 0
                log.info(curatorEvent.getType());  // SET_DATA
                log.info(curatorEvent.getPath());  // /node
                log.info(curatorEvent.getStat().toString());  // 21474836489,21474836542,1620040487612,1620042328488,4,0,0,0,3,0,21474836489
              }
            })
            .forPath("/node", "432".getBytes());
    log.info("设置完成");
  }

5. 删除节点

5.1 案例一:删除一个节点

  @Test
  public void testDelete() throws Exception {
    client.delete()
            .forPath("/node");
    log.info("删除结束");
  }

5.2 案例二:递归删除节点

  @Test
  public void testDelete1() throws Exception {
    client.delete()
            .deletingChildrenIfNeeded()
            .forPath("/node2");
    log.info("删除结束");
  }

5.3 案例三:带回调方法删除一个节点

  @Test
  public void testDelete3() throws Exception {
    client.delete()
            .deletingChildrenIfNeeded()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getType());  // DELETE
                log.info(curatorEvent.getPath());  // /node1
              }
            })
            .forPath("/node1");
    log.info("删除结束");
  }

6. 查看节点

6.1 案例一:查看一个节点

  @Test
  public void testGet() throws Exception {
    byte[] data = client.getData()
            .forPath("/node2");
    log.info(new String(data));
  }

6.2 案例二:查看节点的值和状态

  @Test
  public void testGet2() throws Exception {
    Stat stat = new Stat();
    byte[] data = client.getData()
            .storingStatIn(stat)
            .forPath("/node2");
    log.info(new String(data));
    log.info(stat.getVersion());
  }

6.3 案例三:带回调方法查看一个节点

  @Test
  public void testGet3() throws Exception {
    client.getData()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(new String(curatorEvent.getData()));  // 4134134
                log.info(curatorEvent.getStat().toString());  // 21474836566,21474836566,1620042863998,1620042863998,0,0,0,0,7,0,21474836566
                log.info(curatorEvent.getType().toString());  // GET_DATA
              }
            })
            .forPath("/node2");
  }

7. 查看子节点

7.1 案例一:查看一个节点的所有子节点

  @Test
  public void testChildren() throws Exception {
    List<String> children = client.getChildren()
            .forPath("/");
    log.info(children.toString());
  }

7.2 案例二:带回调方法查看一个节点的所有子节点

  @Test
  public void testChildren2() throws Exception {
    client.getChildren()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getPath()); // /
                log.info(curatorEvent.getType().toString());  // CHILDREN
                log.info(curatorEvent.getChildren().toString());  // [node, node2, node3]
              }
            })
            .forPath("/");
  }

8. 检查节点是否存在

8.1 案例一:检查一个节点是否存在

  @Test
  public void testExists() throws Exception {
    Stat stat = client.checkExists()
            .forPath("/node");
    if (stat != null)
      log.info(stat.toString());
    else
      log.info("节点不存在");
  }

8.2 案例二:带回调方法检查一个节点是否存在

  @Test
  public void testExists1() throws Exception {
    client.checkExists()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getType().toString());  // EXISTS
                Stat stat = curatorEvent.getStat();
                if (stat != null)
                  log.info(stat.toString());  // 21474836548,21474836548,1620042534164,1620042534164,0,0,0,0,0,0,21474836548
                else
                  log.info("节点不存在");
              }
            })
            .forPath("/node");
  }

9. Watcher

  • curator提供了两种Watcher(Cache)来监听结点的变化
  • NodeCache : 只是监听某一个特定的节点,监听节点的新增、修改数据、删除。(子节点的新增、删除、修改均不会管)
  • PathChildrenCache : 监控一个ZNode的子节点. 当一个子节点增加、修改数据、删除时, PathCache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态
  • 这个监视器可以多次使用

9.1 案例一:NodeCache

  @Test
  public void testWatch() throws Exception {
    //  观察节点的变化
    NodeCache nodeCache = new NodeCache(client, "/node22");
    nodeCache.start();
    nodeCache.getListenable()
            .addListener(new NodeCacheListener() {
              @Override
              public void nodeChanged() throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null) {
                  log.info(currentData.getPath());
                  log.info(new String(currentData.getData()));
                } else {
                  log.info("删除了某个节点");
                }
              }
            });
    Thread.sleep(60000); //睡30s
    nodeCache.close();
  }

9.2 案例二:PathChildrenCache

  @Test
  public void testWatch2() throws Exception {
    //  观察节点的变化
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node22", true);
    pathChildrenCache.start();
    pathChildrenCache.getListenable()
            .addListener(new PathChildrenCacheListener() {
              @Override
              public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                log.info(pathChildrenCacheEvent.getType());  // CHILD_ADDED, CHILD_REMOVED, CHILD_UPDATED
                log.info(pathChildrenCacheEvent.getData().toString());  // ChildData{path='/node22/child', stat=21474836630,21474836630,1620044984259,1620044984259,0,0,0,0,2,0,21474836630, data=[50, 50]}
                log.info(new String(pathChildrenCacheEvent.getData().getData()));
                log.info(pathChildrenCacheEvent.getData().getPath());  // ChildData{path='/node22/child'
                log.info(pathChildrenCacheEvent.getData().getStat().toString());  // 21474836630,21474836630,1620044984259,1620044984259,0,0,0,0,2,0,21474836630
              }
            });
    Thread.sleep(60000); //睡30s
    pathChildrenCache.close();
  }

10. 事务

10.1 案例一:使用事务创建两个节点

  @Test
  public void testTransaction() throws Exception {
    client.inTransaction()
            .create().forPath("/node100", "100".getBytes())
            .and()  // 桥
            .create().forPath("/node101", "101".getBytes())
            .and()  // 桥
            .commit();  // 提交
    log.info("提交成功");
  }

11. 分布式锁

11.1 使用分布式可重入排它锁

  • 排它锁,就是所有人都争抢同一个锁节点/lock,请求的时候,会在/lock内部添加一个顺序节点,当轮到自己的时候,就可以继续执行;否则就阻塞。释放锁的时候,会删除自己增加的顺序节点。(基本实现原理与分布式锁基本一致)
      @Test
      public void testMutex() throws Exception {
        //  排他锁
        InterProcessLock lock = new InterProcessMutex(client, "/lock");
        log.info("等待获取锁对象");
        lock.acquire();
        for (int i = 0; i < 3; i++) {
          Thread.sleep(3000);
          System.out.println(i);
        }
        lock.release();
        log.info("释放锁");
      }
    

11.2 使用读写锁

  • 读锁和写锁是两种类型的锁,但是如果两者争抢同一个锁节点的时候,也会发生一些有趣的事情。
  • 当读锁进入之后,其他的读锁也可以进入;但是写锁只能在外面等;
  • 当写锁进入之后,读写锁都不能进入。
      /**
       * 读锁在运行的时候,写锁不允许工作,在阻塞。
       * 读锁运行的时候,允许另一个读锁也进入读数据
       * 写锁运行时,其他读写锁都不能进入
       * @throws Exception
       */
      @Test
      public void testReadLock() throws Exception {
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock");
        InterProcessLock readLock = interProcessReadWriteLock.readLock();
        log.info("等待获取读锁对象");
        readLock.acquire();
        for (int i = 0; i < 10; i++) {
          Thread.sleep(3000);
          System.out.println(i);
        }
        readLock.release();
        log.info("释放锁");
      }
    
      @Test
      public void testWriteLock() throws Exception {
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock");
        InterProcessLock writeLock = interProcessReadWriteLock.writeLock();
        log.info("等待获取写锁对象");
        writeLock.acquire();
        for (int i = 0; i < 10; i++) {
          Thread.sleep(3000);
          System.out.println(i);
        }
        writeLock.release();
        log.info("释放锁");
      }
    
上一篇:2021年大数据ZooKeeper(五):ZooKeeper Java API操作


下一篇:Zookeeper学习