1、分布式安装部署
1.集群规划
在hadoop101、hadoop102和hadoop103三个节点上部署Zookeeper。
2.配置服务器编号
(1)在/export/software/zookeeper/这个目录下创建zkData
[root@hadoop101 zookeeper]# mkdir -p zkData |
(2)在/export/software/zookeeper/zkData目录下创建一个myid的文件
[root@hadoop101 zookeeper]# cd zkData [root@hadoop101 zkData]# touch myid |
添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码
(3)编辑myid文件
[root@hadoop101 zkData]# vi myid |
在文件中添加与server对应的编号:
1 |
注意要和zoo.cfg中对应的编码配对起来。
(4)编辑日志存放的目录
[root@hadoop101 zookeeper]# cd bin [root@hadoop101 zookeeper]# vim zkEnv.sh |
注意里面的ZOO_LOG_DIR="."表示在哪里启动zookeeper,就把日志写在哪里,可以找个地方存放日志,修改为:
if [ "x${ZOO_LOG_DIR}" = "x" ] then ZOO_LOG_DIR="/export/servers/zookeeper/logs" fi |
(5)拷贝配置好的zookeeper到其他机器上
[root@hadoop101 software]# scp -r zookeeper/ hadoop102:$PWD [root@hadoop101 software]# scp -r zookeeper/ hadoop103:$PWD |
并分别在hadoop102、hadoop103上修改myid文件中内容为2、3
[root@hadoop102 zkData]# echo 2 > myid |
[root@hadoop103 zkData]# echo 3 > myid |
3.集群操作
分别启动Zookeeper
[root@hadoop101 zookeeper]# bin/zkServer.sh start |
注意:此时查看状态时有可能会出现Error
此时jps是可以看到QuorumPeerMain的。
注意前面提到zookeeper要超过一半以上节点存活时才能正常启动。只有一台不行,至少2台。因此启动第二台服务器。
[root@hadoop102 zookeeper]# bin/zkServer.sh start |
此时,在第一台上看status,就正常了。
启动第三台服务器。
[root@hadoop103 zookeeper]# bin/zkServer.sh start |
在三台机器上都查看状态
[root@hadoop102 zookeeper]# bin/zkServer.sh status |
[root@hadoop103 zookeeper]# bin/zkServer.sh status |
注意这里选举了第2台服务器作为leader。
2、客户端命令行操作
命令基本语法 |
功能描述 |
help |
显示所有操作命令 |
ls path [watch] |
使用 ls 命令来查看当前znode中所包含的内容 |
ls2 path [watch] |
查看当前节点数据并能看到更新次数等数据 |
create |
普通创建 -s 含有序列 -e 临时(重启或者超时消失) |
get path [watch] |
获得节点的值 |
set |
设置节点的具体值 |
stat |
查看节点状态 |
delete |
删除节点 |
rmr |
递归删除节点 |
总结的就是4个思想:增删改查。
1.启动客户端
[root@hadoop101 zookeeper]# bin/zkCli.sh |
如果出现如下信息:
......,closing socket connection and attempting recinnect
则要注意需要超过一半以上节点存活时才能正常启动。
2.显示所有操作命令
[zk: localhost:2181(CONNECTED) 1] help |
3.查看当前znode中所包含的内容
[zk: localhost:2181(CONNECTED) 0] ls / |
如果能看到这个效果,就表示正常了。
目前根目录只有一个子节点zookeeper。
也可以查看该子节点下面的内容。
另外后面可以加上watch。
Zookeeper有两部分。一部分是文件系统,一部分是通知机制。通知的前提是先注册,一旦数据发生变化,只通知注册的人。
[zk: localhost:2181(CONNECTED) 2] ls / watch |
首先展示根目录的子节点,然后看着根目录的子节点。与之前不一样的是,此时已经注册进根目录的子节点的观察者列表。
现在,假设根目录发生了变化。Zookeeper就会通知我。
启动另一个终端,也启动Client。
该终端在根目录创建子节点ttt。
[zk: localhost:2181(CONNECTED) 0] create /ttt 123 |
留意第一个终端出现了WatchedEvent,观察到了事件。
4.查看当前节点详细数据
[zk: localhost:2181(CONNECTED) 1] ls2 / [zookeeper] cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1 |
5.分别创建2个普通节点
[zk: localhost:2181(CONNECTED) 1] create /tt1234 abcdef |
注意create后面需要跟两个参数。第一个是节点的名字,第二个是节点存储的内容。
另外可以跟-s和-e的参数。
-s表示带序号、序列的节点。
如果把上面那句话再写一次。
[zk: localhost:2181(CONNECTED) 2] create /tt1234 abcdef |
会提示Node already exists。
此时,加上-s。
[zk: localhost:2181(CONNECTED) 2] create -s /tt1234 abcdef |
此时创建成功了,而且后面加上了一个序号。这就是带序号的节点。这个序号是全局递增的。比如多写几个create -s。
注意如果改变了路径
[zk: localhost:2181(CONNECTED) 8] create -s /pppp abcdef |
说明这个序号是全局递增的。
系统会把节点名字和内部序号做了拼接。
-e表示临时节点。
-e是临时的,不带-e则是永久的。
[zk: localhost:2181(CONNECTED) 10] create -e /xxx abcdef |
在第二个终端会话中创建了临时节点,在第一个终端会话中进行查询。
ls / |
目前是能看到该节点的。
此时,在第二个终端中退出
[zk: localhost:2181(CONNECTED) 11] quit |
在第一个终端中再次查询。
ls / |
可以看到,只有临时节点消失了。
所以,临时节点就是:创建临时节点的会话中断后,该节点就消失了。
[zk: localhost:2181(CONNECTED) 0] create -s -e /pppp asdf |
这个就表示创建一个有序的临时节点。
6.获得节点的值
[zk: localhost:2181(CONNECTED) 5] get /ttt |
其中第一行就是刚才设置的数据。
get watch可以看节点的变化。
[zk: localhost:2181(CONNECTED) 5] get /ttt watch |
此时,在第二个终端中设置节点的数值
[zk: localhost:2181(CONNECTED) 0] set /ttt 2345 |
注意此时第一个终端中出现了WatchedEvent。
所以,ls watch和get watch这两个看到的事件类型是不一样的。
ls watch看到的类型是NodeChildrenChanged。
get watch看到的类型是NodeDataChanged。
另外,在第二个终端中改一下数值
[zk: localhost:2181(CONNECTED) 1] set /ttt 23456 |
但留意第一个终端中是没有反应的。注意这个注册列表只会监控一次。
每个节点都会有一个监听者的列表。 通知一个,就划去一个,通知完了列表就空了。下次再想看,只能再注册。
7.修改节点数据值
[zk: localhost:2181(CONNECTED) 2] set /ttt 23456 |
如上。
8.查看节点状态
[zk: localhost:2181(CONNECTED) 3] stat /ttt |
这些就是节点的状态。见后面关于stat结构体的介绍。
9.删除节点
[zk: localhost:2181(CONNECTED) 8] delete /ttt |
注意delete删除的节点不能有子节点,如果有子节点,只能使用rmr。
10.递归删除节点
[zk: localhost:2181(CONNECTED) 9] rmr /ttt |
11.节点的值变化监听
(1)在hadoop103主机上注册监听/sanguo节点数据变化
[zk: localhost:2181(CONNECTED) 26] [zk: localhost:2181(CONNECTED) 8] get /sanguo watch |
(2)在hadoop102主机上修改/sanguo节点的数据
[zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi" |
(3)观察hadoop103主机收到数据变化的监听
WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo |
12.节点的子节点变化监听(路径变化)
(1)在hadoop103主机上注册监听/sanguo节点的子节点变化
[zk: localhost:2181(CONNECTED) 1] ls /sanguo watch [aa0000000001, server101] |
(2)在hadoop102主机/sanguo节点上创建子节点
[zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi" Created /sanguo/jin |
(3)观察hadoop103主机收到子节点变化的监听
WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo |
3、API应用
①环境搭建
(1).创建一个Maven工程zookeeper1
(2).添加pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhang</groupId> <artifactId>zookeeper1</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> </dependency> </dependencies> </project> |
(3).拷贝log4j.properties文件到项目根目录
需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n |
②创建ZooKeeper客户端
在main下创建包com.zhang.zookeeper,创建类ZkClient。
public class ZkClient { } |
创建zookeeper对象,并注册回调函数。
private static final String CONNECT_STRING = "hadoop101:2181,hadoop102:2181,hadoop103:2181"; private static final int SESSION_TIMEOUT = 2000; private ZooKeeper zkClient = null; @Before public void init() throws Exception { zkClient = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { // 收到事件通知后的回调函数(用户的业务逻辑) System.out.println(event.getType() + "--" + event.getPath()); // 再次启动监听 try { zkClient.getChildren("/", true); } catch (Exception e) { e.printStackTrace(); } } }); } |
注意:
ZooKeeper构造方法的三个参数:
参数1:connectString。连接字符串。这里是hadoop101:2181,hadoop102:2181,hadoop103:2181。如果Windows中没有写hosts,则该字符串中需要改为IP地址。
参数2:sessionTimeout。SESSION_TIMEOUT表示主机客户端短了以后多久,主机认为客户端已经失联了。一旦失联了,名下所有临时节点都会被主机删除。
参数3:watcher。Zookeeper是两部分,文件系统加通知机制。通知机制生效的前提是需要在它的观察者名单上。如果这个客户端注册以后,默认的通知的回调函数就是这个watcher中的process()。
当后面的任何方法执行时,都会先执行@Before修饰的方法init()。创建ZooKeeper对象。然后执行zkClient相关方法时(比如zkClient.create()、zkClient.getChildren()等),都会触发该回调函数process()的调用。
③ 获取子节点并监听节点变化
// 获取子节点 @Test public void ls() throws Exception { List<String> children = zkClient.getChildren("/", true); System.out.println("======================"); for (String child : children) { System.out.println(child); } System.out.println("======================"); // 延时阻塞 Thread.sleep(Long.MAX_VALUE); } |
运行效果大致如下
注意客户端和服务端的通信属于异步通信,肯定涉及到多线程。单线程肯定不能实现异步通信。回调函数是运行在子线程的,一旦主线程结束了,子线程就死亡了(守护线程?)。
运行该程序,发现主线程阻塞了。此时在zookeeper中执行
[root@hadoop101 zookeeper]# bin/zkCli.sh |
[zk: localhost:2181(CONNECTED) 0] rmr /xxx |
可以留意回调函数被调用了。
④ 创建子节点
// 创建子节点 @Test public void create() throws Exception { // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型 String nodeCreated = zkClient.create("/zhang", "yiheng".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(nodeCreated); } |
参数3:定义哪些主机、哪些IP可以访问现在创建的这个节点。OPEN_ACL_UNSAFE特点是谁都可以访问,没有权限限制。
参数4:节点类型。实际上就是create命令中的-s和-e的参数。
- PERSISTENT。不带-s和-e参数。永久节点。
- EPHEMERAL_SEQUENTIAL。带-s和-e参数。带序号序列的临时节点。
- EPHEMERAL。带-e参数。临时节点。
- PERSISTENT_SEQUENTIAL。带-s参数。带序号序列的永久节点。
该函数返回创建的路径。
运行程序,结果如下:
现在在Linux中查看。
注意,如果是创建EPHEMERAL相关的节点,则需要加上延时阻塞的功能,不能在Linux中是看不到该节点的。
⑤判断Znode是否存在
// 判断znode是否存在 @Test public void exist() throws Exception { Stat stat = zkClient.exists("/eclipse", false); System.out.println(stat == null ? "not exist" : "exist"); } |
假如存在,则返回stat类型。
PS. 程序中需要导的包如下:
import org.apache.zookeeper.WatchedEvent; |
注意:运行上面的程序,需要确保在虚拟机上开启zookeeper
[root@hadoop101 zookeeper]# bin/zkServer.sh start |
[root@hadoop102 zookeeper]# bin/zkServer.sh start |
[root@hadoop103 zookeeper]# bin/zkServer.sh start |
可尝试自己实现其他方法:
get()
@Test |
注意:Stat是节点的状态信息。
set()
@Test |
运行结果如下:
注意version参数不能乱设,否则会出现BadVersion的错误。可以先通过
stat /tt1234 |
命令查看版本号。假如要改的数据跟这个版本不一样,修改失败。这是因为这个程序是跟别的程序并发写的,为了产生误删的杯具,需要确定删的跟看到的是同一个东西。实际中一般可以考虑先获取stat对象,再getVersion()得到具体的版本,而不是写一个固定值。
delete()
@Test |
以上zookeeper的API基本上就够了。
观察一个节点,只能观察一次。以后有变化了,也看不到。现在希望循环注册节点,一旦有变化了就通知(而不是只通知一次)。
先注册一个节点
[zk: localhost:2181(CONNECTED) 0] create /a 123 |
编写代码
public void register() throws Exception { |
在通知的时候,再注册一遍。以此来实现循环调用。
运行程序,获取该节点123
现在,修改该节点的值
[zk: localhost:2181(CONNECTED) 1] set /a 1234 |
留意Linux终端运行命令的通知,Java终端也打印出更新的值。
可以自行运行下面的案例。
4、箭头服务器节点动态上下线案例(扩展)
1.需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
2.需求分析,如图所示
3.具体实现
(1)先在集群上创建/servers节点
[zk: localhost:2181(CONNECTED) 10] create /servers "servers" Created /servers |
(2)服务器端向Zookeeper注册代码
package com.zhang.zkcase; import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; public class DistributeServer { private static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181"; private static int sessionTimeout = 2000; private ZooKeeper zk = null; private String parentNode = "/servers"; // 创建到zk的客户端连接 public void getConnect() throws IOException{ zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { } }); } // 注册服务器 public void registServer(String hostname) throws Exception{ String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname +" is online "+ create); } // 业务功能 public void business(String hostname) throws Exception{ System.out.println(hostname+" is working ..."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 1获取zk连接 DistributeServer server = new DistributeServer(); server.getConnect(); // 2 利用zk连接注册服务器信息 server.registServer(args[0]); // 3 启动业务功能 server.business(args[0]); } } |
(3)客户端代码
package com.zhang.zkcase; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class DistributeClient { private static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181"; private static int sessionTimeout = 2000; private ZooKeeper zk = null; private String parentNode = "/servers"; // 创建到zk的客户端连接 public void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 再次启动监听 try { getServerList(); } catch (Exception e) { e.printStackTrace(); } } }); } // 获取服务器列表信息 public void getServerList() throws Exception { // 1获取服务器子节点信息,并且对父节点进行监听 List<String> children = zk.getChildren(parentNode, true); // 2存储服务器信息列表 ArrayList<String> servers = new ArrayList<String>(); // 3遍历所有节点,获取节点中的主机名称信息 for (String child : children) { byte[] data = zk.getData(parentNode + "/" + child, false, null); servers.add(new String(data)); } // 4打印服务器列表信息 System.out.println(servers); } // 业务功能 public void business() throws Exception{ System.out.println("client is working ..."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 1获取zk连接 DistributeClient client = new DistributeClient(); client.getConnect(); // 2获取servers的子节点信息,从中获取服务器信息列表 client.getServerList(); // 3业务进程启动 client.business(); } } |