zookeeper 使用api 进行节点增删改查及实现简易的配置中心

本文为博主原创,未经允许不得转载:

目录:

  1. 对 zookeeper 节点进行增删改查既配置acl 权限等

  2.使用 zookeeper  实现一个简易的配置中心

  

  1. 对 zookeeper 节点进行增删改查既配置acl 权限等

  apache 提供了对 zookeeper 操作的 api 操作。即引入对应的 jar 包即可进行操作zookeeper。‘

  引入 apache zookeeper 的pom 配置。这里版本请保持与服务端版本一致,不然会有很多兼容性的问题

    <dependency>
             <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.8</version>
        </dependency>

  将对 zookeeper 进行操作的 api 使用封装了一个测试类,包含:新增节点,修改节点,删除节点,查询节点,配置节点acl 权限等,可以在其中进行测试:

package com.example.demo.zookeeper;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

@Slf4j
public class ZookeeperClientTest {
    // ZooKeeper服务器列表,由英文逗号分开的host:port字符串组成, 每一个都代表一台ZooKeeper机器,
    // 也可以在connectString中设 置客户端连接上ZooKeeper 后的根目录,方法是在host:port字符串之后添加上这个根目录,实现隔离
    private static final String ZK_ADDRESS="112.125.26.68:2181";
    // 会话的超时时间,单位毫秒,在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳 检测机制来维持会话的有效性,‘
    // 一旦在sessionTimeout时间内没有进行有效 的心跳检测,会话就会失效。
    private static final int SESSION_TIMEOUT = 5000;
    private static ZooKeeper zooKeeper;
    private static final String ZK_NODE="/zk‐node";

    @Before
    public void init() throws IOException, InterruptedException {
        final CountDownLatch countDownLatch=new CountDownLatch(1);
        // watchedEvent 为 Watcher事件通知处理器,该参 数可以设置为null 以表明不需要设置默认的 Watcher处理器
        zooKeeper=new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT,watchedEvent -> {
                if (watchedEvent.getState()== Watcher.Event.KeeperState.SyncConnected &&
                        watchedEvent.getType()== Watcher.Event.EventType.None){
                    countDownLatch.countDown();
                    log.info("连接成功!");
                }
        });
        log.info("连接中....");
        countDownLatch.await();
    }

    /**
     * 新增节点数据
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void createTest() throws KeeperException, InterruptedException {
        String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        log.info("created path: {}",path);
    }

    /**
     * 修改节点并删除指定版本节点
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void setTest() throws KeeperException, InterruptedException {
         Stat stat = new Stat();
         byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
         log.info("修改前: {}",new String(data));
         zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
         // getData 可以查看节点的配置数据
         byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
         log.info("修改后: {}",new String(dataAfter));
         // 删除指定版本的zookeeper节点
        zooKeeper.delete(ZK_NODE,stat.getVersion());
    }


    /**
     *  用 world 模式创建节点
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void createWithAclTest1() throws KeeperException, InterruptedException {

        List<ACL> acLList = new ArrayList<ACL>();
        ACL e = new ACL();
        Id m_ = new Id();
        m_.setId("anyone");
        m_.setScheme("world");

        int perms = ZooDefs.Perms.ADMIN  |  ZooDefs.Perms.READ;
        e.setId(m_);
        e.setPerms(perms);
        acLList.add(e);
        String s = getZooKeeper().create("/zk-node-1", "gj".getBytes(), acLList, CreateMode.PERSISTENT);
        log.info("create path: {}",s);
    }

    public static ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    /**
     *
     * 用授权模式创建节点
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void createWithAclTest2() throws KeeperException, InterruptedException {

        // 对连接添加授权信息
        getZooKeeper().addAuthInfo("digest","u400:p400".getBytes());

        List<ACL> acLList = new ArrayList<ACL>();
        ACL e = new ACL();
        Id m_ = new Id();
        m_.setId("u400:p400");
        m_.setScheme("auth");

        int perms = ZooDefs.Perms.ADMIN  |  ZooDefs.Perms.READ;
        e.setId(m_);
        e.setPerms(perms);
        acLList.add(e);

        String s = getZooKeeper().create("/zk-node-2", "gj".getBytes(), acLList, CreateMode.PERSISTENT);
        log.info("create path: {}",s);
    }


    @Test
    public void createWithAclTest3() throws KeeperException, InterruptedException {
        // 对连接添加授权信息
        getZooKeeper().addAuthInfo("digest","u400:p400".getBytes());
        byte[] data = getZooKeeper().getData("/test", false, null);
        log.info("GET_DATA : {}",new String(data));
    }

}

 

   2.使用 zookeeper  实现一个简易的配置中心

    通过 watcher  监听节点配置的改动,并加载 zookeeper 指定节点的配置

  

package com.example.demo.zookeeper;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.server.util.ConfigUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;

/**
 * zookeeper 做配置中心
 * 
 */
@Slf4j
public class ZookeeperConfigCenter {

    private final  static  String connectString = "112.125.26.68:2181,112.125.26.68:2182,112.125.26.68:2183,112.125.26.68:2184";

    private static int SESSION_TIMEOUT=5* 1000;

    private static CountDownLatch countDownLatch=new CountDownLatch(1);

    private  static ZooKeeper zookeeper =null;

    private static  Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.None
                    && event.getState() == Event.KeeperState.SyncConnected){
                countDownLatch.countDown();
                log.info(" 连接建立");
                // start to watch config
                try {
                    log.info(" 开始监听:{}",ZooDefs.CONFIG_NODE);
                    zookeeper.getConfig(true,null);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else if( event.getPath()!=null  &&  event.getPath().equals(ZooDefs.CONFIG_NODE)){
                try {
                    byte[] config = zookeeper.getConfig(this, null);
                    String clientConfigStr = ConfigUtils.getClientConfigStr(new String(config));
                    log.info(" 配置发生变更: {}",clientConfigStr);
                    zookeeper.updateServerList(clientConfigStr.split(" ")[1]);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }
    };


    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zookeeper =   new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);
        countDownLatch.await();
        Scanner scanner =new Scanner(System.in);
        while (true){
            byte[] data = zookeeper.getData("/zookeeper/config", true, null);
            scanner.next();
            log.info("DATA: {}",new String(data));
        }

    }
}

 

  

上一篇:java高版本下各种JNDI Bypass方法复现


下一篇:主线程等待所有的子线程结束之后再执行?(转)