Zookeeper实战(开发重点)

1、分布式安装部署

1.集群规划

在hadoop101、hadoop102和hadoop103三个节点上部署Zookeeper。

2.配置服务器编号

(1)在/export/software/zookeeper/这个目录下创建zkData

[root@hadoop101 zookeeper]# mkdir -p zkData

(2)在/export/software/zookeeper/zkData目录下创建一个myid的文件

[root@hadoop101 zookeeper]# cd zkData

[root@hadoop101 zkData]# touch myid

添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码

(3)编辑myid文件

[root@hadoop101 zkData]# vi myid

在文件中添加与server对应的编号:

1

注意要和zoo.cfg中对应的编码配对起来。

(4)编辑日志存放的目录

[root@hadoop101 zookeeper]# cd bin

[root@hadoop101 zookeeper]# vim zkEnv.sh

注意里面的ZOO_LOG_DIR="."表示在哪里启动zookeeper,就把日志写在哪里,可以找个地方存放日志,修改为:

if [ "x${ZOO_LOG_DIR}" = "x" ]

then

    ZOO_LOG_DIR="/export/servers/zookeeper/logs"

fi

(5)拷贝配置好的zookeeper到其他机器上

[root@hadoop101 software]# scp -r zookeeper/ hadoop102:$PWD

[root@hadoop101 software]# scp -r zookeeper/ hadoop103:$PWD

并分别在hadoop102、hadoop103上修改myid文件中内容为2、3

[root@hadoop102 zkData]# echo 2 > myid

[root@hadoop103 zkData]# echo 3 > myid

3.集群操作

分别启动Zookeeper

[root@hadoop101 zookeeper]# bin/zkServer.sh start

注意:此时查看状态时有可能会出现Error

此时jps是可以看到QuorumPeerMain的。

注意前面提到zookeeper要超过一半以上节点存活时才能正常启动。只有一台不行,至少2台。因此启动第二台服务器。

[root@hadoop102 zookeeper]# bin/zkServer.sh start

此时,在第一台上看status,就正常了。

启动第三台服务器。

[root@hadoop103 zookeeper]# bin/zkServer.sh start

在三台机器上都查看状态

[root@hadoop102 zookeeper]# bin/zkServer.sh status

[root@hadoop103 zookeeper]# bin/zkServer.sh status

注意这里选举了第2台服务器作为leader。

​​​​​​​2、客户端命令行操作

命令基本语法

功能描述

help

显示所有操作命令

ls path [watch]

使用 ls 命令来查看当前znode中所包含的内容

ls2 path [watch]

查看当前节点数据并能看到更新次数等数据

create

普通创建

-s  含有序列

-e  临时(重启或者超时消失)

get path [watch]

获得节点的值

set

设置节点的具体值

stat

查看节点状态

delete

删除节点

rmr

递归删除节点

总结的就是4个思想:增删改查。

1.启动客户端

[root@hadoop101 zookeeper]# bin/zkCli.sh

如果出现如下信息:

......,closing socket connection and attempting recinnect

则要注意需要超过一半以上节点存活时才能正常启动。

2.显示所有操作命令

[zk: localhost:2181(CONNECTED) 1] help

3.查看当前znode中所包含的内容

[zk: localhost:2181(CONNECTED) 0] ls /

如果能看到这个效果,就表示正常了。

目前根目录只有一个子节点zookeeper。

也可以查看该子节点下面的内容。

另外后面可以加上watch。

        Zookeeper有两部分。一部分是文件系统,一部分是通知机制。通知的前提是先注册,一旦数据发生变化,只通知注册的人。

[zk: localhost:2181(CONNECTED) 2] ls / watch

首先展示根目录的子节点,然后看着根目录的子节点。与之前不一样的是,此时已经注册进根目录的子节点的观察者列表。

现在,假设根目录发生了变化。Zookeeper就会通知我。

启动另一个终端,也启动Client。

该终端在根目录创建子节点ttt。

[zk: localhost:2181(CONNECTED) 0] create /ttt 123

留意第一个终端出现了WatchedEvent,观察到了事件。

4.查看当前节点详细数据

[zk: localhost:2181(CONNECTED) 1] ls2 /

[zookeeper]

cZxid = 0x0

ctime = Thu Jan 01 08:00:00 CST 1970

mZxid = 0x0

mtime = Thu Jan 01 08:00:00 CST 1970

pZxid = 0x0

cversion = -1

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 0

numChildren = 1

5.分别创建2个普通节点

[zk: localhost:2181(CONNECTED) 1] create /tt1234 abcdef

        注意create后面需要跟两个参数。第一个是节点的名字,第二个是节点存储的内容。

另外可以跟-s和-e的参数。

-s表示带序号、序列的节点。

如果把上面那句话再写一次。

[zk: localhost:2181(CONNECTED) 2] create /tt1234 abcdef

会提示Node already exists。

此时,加上-s。

[zk: localhost:2181(CONNECTED) 2] create -s /tt1234 abcdef

        此时创建成功了,而且后面加上了一个序号。这就是带序号的节点。这个序号是全局递增的。比如多写几个create -s。

         注意如果改变了路径

[zk: localhost:2181(CONNECTED) 8] create -s /pppp abcdef

        说明这个序号是全局递增的。

        系统会把节点名字和内部序号做了拼接。

-e表示临时节点。

-e是临时的,不带-e则是永久的。

[zk: localhost:2181(CONNECTED) 10] create -e /xxx abcdef

        在第二个终端会话中创建了临时节点,在第一个终端会话中进行查询。

ls /

        目前是能看到该节点的。

        此时,在第二个终端中退出

[zk: localhost:2181(CONNECTED) 11] quit

        在第一个终端中再次查询。

ls /

可以看到,只有临时节点消失了。

所以,临时节点就是:创建临时节点的会话中断后,该节点就消失了。

[zk: localhost:2181(CONNECTED) 0] create -s -e /pppp asdf

这个就表示创建一个有序的临时节点。

6.获得节点的值

[zk: localhost:2181(CONNECTED) 5] get /ttt

其中第一行就是刚才设置的数据。

get watch可以看节点的变化。

[zk: localhost:2181(CONNECTED) 5] get /ttt watch

此时,在第二个终端中设置节点的数值

[zk: localhost:2181(CONNECTED) 0] set /ttt 2345

注意此时第一个终端中出现了WatchedEvent。

所以,ls watch和get watch这两个看到的事件类型是不一样的。

ls watch看到的类型是NodeChildrenChanged。

get watch看到的类型是NodeDataChanged。

另外,在第二个终端中改一下数值

[zk: localhost:2181(CONNECTED) 1] set /ttt 23456

        但留意第一个终端中是没有反应的。注意这个注册列表只会监控一次。

每个节点都会有一个监听者的列表。 通知一个,就划去一个,通知完了列表就空了。下次再想看,只能再注册。

7.修改节点数据值

[zk: localhost:2181(CONNECTED) 2] set /ttt 23456

如上。

8.查看节点状态

[zk: localhost:2181(CONNECTED) 3] stat /ttt

这些就是节点的状态。见后面关于stat结构体的介绍。

9.删除节点

[zk: localhost:2181(CONNECTED) 8] delete /ttt

注意delete删除的节点不能有子节点,如果有子节点,只能使用rmr。

10.递归删除节点

[zk: localhost:2181(CONNECTED) 9] rmr /ttt

11.节点的值变化监听

(1)在hadoop103主机上注册监听/sanguo节点数据变化

[zk: localhost:2181(CONNECTED) 26] [zk: localhost:2181(CONNECTED) 8] get /sanguo watch

(2)在hadoop102主机上修改/sanguo节点的数据

[zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"

(3)观察hadoop103主机收到数据变化的监听

WATCHER::

WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo

12.节点的子节点变化监听(路径变化)

(1)在hadoop103主机上注册监听/sanguo节点的子节点变化

[zk: localhost:2181(CONNECTED) 1] ls /sanguo watch

[aa0000000001, server101]

(2)在hadoop102主机/sanguo节点上创建子节点

[zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi"

Created /sanguo/jin

(3)观察hadoop103主机收到子节点变化的监听

WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo

​​​​​​​3、API应用

①环境搭建

(1).创建一个Maven工程zookeeper1

(2).添加pom文件

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zhang</groupId>

    <artifactId>zookeeper1</artifactId>

    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>RELEASE</version>

        </dependency>

        <dependency>

            <groupId>org.apache.logging.log4j</groupId>

            <artifactId>log4j-core</artifactId>

            <version>2.8.2</version>

        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->

        <dependency>

            <groupId>org.apache.zookeeper</groupId>

            <artifactId>zookeeper</artifactId>

            <version>3.4.10</version>

        </dependency>

    </dependencies>

</project>

(3).拷贝log4j.properties文件到项目根目录

需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender

log4j.appender.logfile.File=target/spring.log

log4j.appender.logfile.layout=org.apache.log4j.PatternLayout

log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

②创建ZooKeeper客户端

在main下创建包com.zhang.zookeeper,创建类ZkClient。

public class ZkClient {

}

创建zookeeper对象,并注册回调函数。

private static final String CONNECT_STRING =

 "hadoop101:2181,hadoop102:2181,hadoop103:2181";

private static final int SESSION_TIMEOUT = 2000;

private ZooKeeper zkClient = null;

@Before

public void init() throws Exception {

zkClient = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {

@Override

public void process(WatchedEvent event) {

// 收到事件通知后的回调函数(用户的业务逻辑)

System.out.println(event.getType() + "--" + event.getPath());

// 再次启动监听

try {

zkClient.getChildren("/", true);

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

注意:

ZooKeeper构造方法的三个参数:

参数1:connectString。连接字符串。这里是hadoop101:2181,hadoop102:2181,hadoop103:2181。如果Windows中没有写hosts,则该字符串中需要改为IP地址。

参数2:sessionTimeout。SESSION_TIMEOUT表示主机客户端短了以后多久,主机认为客户端已经失联了。一旦失联了,名下所有临时节点都会被主机删除。

参数3:watcher。Zookeeper是两部分,文件系统加通知机制。通知机制生效的前提是需要在它的观察者名单上。如果这个客户端注册以后,默认的通知的回调函数就是这个watcher中的process()。

当后面的任何方法执行时,都会先执行@Before修饰的方法init()。创建ZooKeeper对象。然后执行zkClient相关方法时(比如zkClient.create()、zkClient.getChildren()等),都会触发该回调函数process()的调用。

③ 获取子节点并监听节点变化

// 获取子节点

@Test

public void ls() throws Exception {

List<String> children = zkClient.getChildren("/", true);

System.out.println("======================");

for (String child : children) {

System.out.println(child);

}

System.out.println("======================");

// 延时阻塞

Thread.sleep(Long.MAX_VALUE);

}

运行效果大致如下  

        注意客户端和服务端的通信属于异步通信,肯定涉及到多线程。单线程肯定不能实现异步通信。回调函数是运行在子线程的,一旦主线程结束了,子线程就死亡了(守护线程?)。

运行该程序,发现主线程阻塞了。此时在zookeeper中执行

[root@hadoop101 zookeeper]# bin/zkCli.sh

[zk: localhost:2181(CONNECTED) 0] rmr /xxx  

可以留意回调函数被调用了。

④ 创建子节点

// 创建子节点

@Test

public void create() throws Exception {

// 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型

String nodeCreated = zkClient.create("/zhang", "yiheng".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

System.out.println(nodeCreated);

}

参数3:定义哪些主机、哪些IP可以访问现在创建的这个节点。OPEN_ACL_UNSAFE特点是谁都可以访问,没有权限限制。

参数4:节点类型。实际上就是create命令中的-s和-e的参数。

  1. PERSISTENT。不带-s和-e参数。永久节点。
  2. EPHEMERAL_SEQUENTIAL。带-s和-e参数。带序号序列的临时节点。
  3. EPHEMERAL。带-e参数。临时节点。
  4. PERSISTENT_SEQUENTIAL。带-s参数。带序号序列的永久节点。

该函数返回创建的路径。

运行程序,结果如下:

现在在Linux中查看。

注意,如果是创建EPHEMERAL相关的节点,则需要加上延时阻塞的功能,不能在Linux中是看不到该节点的。

⑤判断Znode是否存在

// 判断znode是否存在

@Test

public void exist() throws Exception {

Stat stat = zkClient.exists("/eclipse", false);

System.out.println(stat == null ? "not exist" : "exist");

}

假如存在,则返回stat类型。

PS. 程序中需要导的包如下:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

注意:运行上面的程序,需要确保在虚拟机上开启zookeeper

[root@hadoop101 zookeeper]# bin/zkServer.sh start

[root@hadoop102 zookeeper]# bin/zkServer.sh start

[root@hadoop103 zookeeper]# bin/zkServer.sh start

可尝试自己实现其他方法:

get()

    @Test
    public void get() throws Exception {
        byte[] data = zkClient.getData("/tt1234", true, new Stat());
        String str = new String(data);
        System.out.println(str);
    }

注意:Stat是节点的状态信息。

set()

    @Test
    public void set() throws Exception {
        Stat stat = zkClient.setData("/tt1234", "defabc".getBytes(), 0);
        System.out.println(stat.getDataLength());
    }

运行结果如下:

注意version参数不能乱设,否则会出现BadVersion的错误。可以先通过

stat /tt1234

        命令查看版本号。假如要改的数据跟这个版本不一样,修改失败。这是因为这个程序是跟别的程序并发写的,为了产生误删的杯具,需要确定删的跟看到的是同一个东西。实际中一般可以考虑先获取stat对象,再getVersion()得到具体的版本,而不是写一个固定值。

delete()

    @Test
    public void delete() throws Exception {
        Stat stat = zkClient.exists("/tt1234", false);
        if (stat != null) {
            zkClient.delete("/tt1234", stat.getVersion());
        }
    }

以上zookeeper的API基本上就够了。

        观察一个节点,只能观察一次。以后有变化了,也看不到。现在希望循环注册节点,一旦有变化了就通知(而不是只通知一次)。

先注册一个节点

[zk: localhost:2181(CONNECTED) 0] create /a 123

编写代码

    public void register() throws Exception {
        byte[] data = zkClient.getData("/a", new Watcher(){

            public void process(WatchedEvent event) {
                try {
                    register();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, null);

        System.out.println(new String(data));
    }

    @Test
    public void testRegister() throws Exception {
        try {
            register();
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }  
    }

在通知的时候,再注册一遍。以此来实现循环调用。

运行程序,获取该节点123

现在,修改该节点的值

[zk: localhost:2181(CONNECTED) 1] set /a 1234

留意Linux终端运行命令的通知,Java终端也打印出更新的值。

可以自行运行下面的案例。

​​​​​​​4、箭头服务器节点动态上下线案例(扩展)

1.需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

2.需求分析,如图所示

3.具体实现

(1)先在集群上创建/servers节点

[zk: localhost:2181(CONNECTED) 10] create /servers "servers"

Created /servers

(2)服务器端向Zookeeper注册代码

package com.zhang.zkcase;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.ZooDefs.Ids;

public class DistributeServer {

private static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";

private static int sessionTimeout = 2000;

private ZooKeeper zk = null;

private String parentNode = "/servers";

// 创建到zk的客户端连接

public void getConnect() throws IOException{

zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

@Override

public void process(WatchedEvent event) {

}

});

}

// 注册服务器

public void registServer(String hostname) throws Exception{

String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println(hostname +" is online "+ create);

}

// 业务功能

public void business(String hostname) throws Exception{

System.out.println(hostname+" is working ...");

Thread.sleep(Long.MAX_VALUE);

}

public static void main(String[] args) throws Exception {

// 1获取zk连接

DistributeServer server = new DistributeServer();

server.getConnect();

// 2 利用zk连接注册服务器信息

server.registServer(args[0]);

// 3 启动业务功能

server.business(args[0]);

}

}

(3)客户端代码

package com.zhang.zkcase;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooKeeper;

public class DistributeClient {

private static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";

private static int sessionTimeout = 2000;

private ZooKeeper zk = null;

private String parentNode = "/servers";

// 创建到zk的客户端连接

public void getConnect() throws IOException {

zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

@Override

public void process(WatchedEvent event) {

// 再次启动监听

try {

getServerList();

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

// 获取服务器列表信息

public void getServerList() throws Exception {

// 1获取服务器子节点信息,并且对父节点进行监听

List<String> children = zk.getChildren(parentNode, true);

        // 2存储服务器信息列表

ArrayList<String> servers = new ArrayList<String>();

        // 3遍历所有节点,获取节点中的主机名称信息

for (String child : children) {

byte[] data = zk.getData(parentNode + "/" + child, false, null);

servers.add(new String(data));

}

        // 4打印服务器列表信息

System.out.println(servers);

}

// 业务功能

public void business() throws Exception{

System.out.println("client is working ...");

Thread.sleep(Long.MAX_VALUE);

}

public static void main(String[] args) throws Exception {

// 1获取zk连接

DistributeClient client = new DistributeClient();

client.getConnect();

// 2获取servers的子节点信息,从中获取服务器信息列表

client.getServerList();

// 3业务进程启动

client.business();

}

}

上一篇:linux 利用crontab添加定时任务


下一篇:vue如何跨域/vue如何配置反向代理(vue)?