zookeeper安装步骤
百度搜索:zookeeper
进入后点击下载:
进入到下载的页面
英文:
中文:
进入版本列表:
进入后复制该链接, 在linux执行wget下载:
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
1、安装jdk:略
2、解压:
tar -zxvf zookeeper-3.4..tar.gz
重命名:
mv zookeeper-3.4. zookeeper
移动文件:
mv zookeeper /usr/local/
配置环境变量:
export ZOOKEEPER_HOME=/usr/local/zookeeper
#当前系统通过yum安装的Open-jdk,经过测试可以不配置环境变量
#export JAVA_HOME=/usr/java/jdk1.8.0_181 export PATH=$PATH:$ZOOKEEPER_HOME/bin
#export PATH=$PATH:$ZOOKEEPER_HOME/bin:$JAVA_HOME\bin
刷新配置:
source /etc/profile
2.配置文件zoo.cfg ,安装集群的话需要配置
初次使用Zookeeper,需要将%ZK_HOME%/conf目录下的zoo_sample.cfg文件重命名为zoo.cfg,并且按照如下代码进行简单配置即可:
tickTime=
dataDir=/var/lib/zookeeper/
clientPort=
initLimit=
syncLimit=
server.=IP1:2888:3888
server.=IP2::
server.=IP3::
linux客户端操作节点数据:
执行 ckCli.sh命令进入客户端:
根据dataVersion版本号识别数据的新增和修改状态
curator客户端是阿帕奇的开源项目:
Curator框架使用链式编程风格,易读性更强,使用工厂方法创建连接对象。
1.使用CuratorFrameworkFactory的两个静态工厂方法(参数不同)来实现
参数1:connectString,连接信息
参数2:RetryPolicy,重试连接策略,有四种实现
ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed
参数3:sessionTimeoutMs会话超时时间,默认为60s
参数4:connectionTimeoutMs连接超时时间,默认为15s
注意:对于retryPolicy策略通过一个接口来让用户自定义实现
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat; public class CuratorBase { /** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = ;//ms public static void main(String[] args) throws Exception { //1 重试策略:初试时间为1s 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(, );
//2 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
// .namespace("super")
.build();
//3 开启连接
cf.start(); // System.out.println(States.CONNECTED);
// System.out.println(cf.getState()); // 新加、删除
/**
//4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
//5 删除节点
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
*/ // 读取、修改
/**
//创建节点
// cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
// cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2内容".getBytes());
//读取节点
// String ret1 = new String(cf.getData().forPath("/super/c2"));
// System.out.println(ret1);
//修改节点
// cf.setData().forPath("/super/c2", "修改c2内容".getBytes());
// String ret2 = new String(cf.getData().forPath("/super/c2"));
// System.out.println(ret2);
*/ // 绑定回调函数
/**
ExecutorService pool = Executors.newCachedThreadPool();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
System.out.println("code:" + ce.getResultCode());
System.out.println("type:" + ce.getType());
System.out.println("线程为:" + Thread.currentThread().getName());
}
}, pool)
.forPath("/super/c3","c3内容".getBytes());
Thread.sleep(Integer.MAX_VALUE);
*/ // 读取子节点getChildren方法 和 判断节点是否存在checkExists方法
/**
List<String> list = cf.getChildren().forPath("/super");
for(String p : list){
System.out.println(p);
} Stat stat = cf.checkExists().forPath("/super/c3");
System.out.println(stat); Thread.sleep(2000);
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
*/
//cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
}
}
重试策略: