Linux(CentOS)中常用软件安装,使用及异常——Zookeeper, Kafka

本文主要是为了记录在工作中遇到的常用软件的安装过程,方便以后遇到相同情形时可以快速的查阅。主要讲述了zookeeper, kafka的安装。
本文的操作系统采用的是CentOS,可以采用shell命令查阅:lsb_release -a.
Linux(CentOS)中常用软件安装,使用及异常——Zookeeper, Kafka


Zookeeper

Zookeeper的安装与配置

可以在http://zookeeper.apache.org/这里下载需要的安装包。这里采用的是zookeeper-3.4.6.tar.gz安装包。
1 首先将安装包解压:

[root@hidden util]# tar -zxvf zookeeper-3.4.6.tar.gz
[root@hidden util]# cd zookeeper-3.4.6
[root@hidden zookeeper-3.4.6]#ls
bin dist-maven  LICENSE.txt src
build.xml       docs        NOTICE.txt  zookeeper-3.4.6.jar
CHANGES.txt ivysettings.xml     README-packaging.txt    zookeeper-3.4.6.jar.asc
conf        ivy.xml README.txt  zookeeper-3.4.6.jar,md5
contrib lib recipes zookeeper-3.4.6.jar.sha1

2 修改系统的环境变量

[root@hidden zookeeper-3.4.6]#vim /etc/profile
#ZooKeeper配置
export ZOOKEEPER_INSTALL=/root/util/zookeeper-3.4.6
export PATH=$PATH:$ZOOKEEPER_INSTALL/bin
[root@hidden zookeeper-3.4.6]# source /etc/profile

3 创建Zookeeper的配置文件

[root@hidden zookeeper-3.4.6]#cd conf
[root@hidden conf]# cp zoo_example.cfg zoo.cfg

4 配置zoo.cfg

tickTime=2000 #ZooKeeper服务器心跳时间,单位为ms
initLimit=10 #投票选举心leader的初始化时间
syncLimit=5 #leader与follower心跳检测最大容忍时间,响应超过syncLimit*tickTime,leader认为follower死掉,从服务器列表中删除follower
clientPort=2181 #端口
dataDir=/tmp/ZooKeeper/data #数据目录
dataLogDir=/tmp/ZooKeeper/log #日志目录

5 创建配置中的相应目录

cd /tmp
mkdir ZooKeeper
cd Zookeeper
mkdir log
mkdir data

或者可以试一下

mkdir -p /tmp/ZooKeeper/data
mkdir -p /tmp/ZooKeeper/log

6 启动ZooKeeper

cd /usr/ZooKeeper/bin
./zkServer.sh start

7 可以使用ZooKeeper自带的客户端工具来查看ZooKeeper的节点建立情况(bin/zkCli.sh)
[图片]

ZooKeeper API使用简介

ZooKeeper实现了一个层次命名空间的数据模型,也可以认为它就是一个小型的、精简的文件系统。它的每个节点称为znode, znode除了本身能够包含一部分数据之外,还能够拥有子节点,当节点上的数据发生变化,或者其子节点发生变化,基于watcher机制,会发出相应的通知给订阅其状态变化的客户端。
首先,实例化一个ZooKeeper对象,指定其三个参数。url为ZooKeeper服务器的地址。sessionTimeOut为会话的超时时间,ZooKeeper的会话超时时间的长度由客户端来确定,但是ZooKeeper的Server端会有两个配置,minSessionTimeout和maxSessionTimeout,minSessionTimeout的值默认为2倍的tickTime, maxSessionTimeout的值默认为20倍的tickTime,单位都是ms. tickTime也是服务端的一个配置项,是Server内部控制时间逻辑的最小时间单位,如果客户端发来的sessionTimeout超过minSessionTimeout~maxSessionTimeout这个范围,Server会自动取minSessionTimeout或者maxSessionTimeout作为sessionTimeout, 然后为这个Client新建一个session对象。最后一个参数为默认的watcher.如果包含boolean watch的读方法中传入true, 则将默认的watcher注册为所关注时间的watcher,如果传入false,则不注册任何watcher,这里暂且定为空。

ZooKeeper zooKeeper = new ZooKeeper(url,sesssionTimeout,null);//注意将安装目录下的zookeeper的jar包拷贝出来导入你的project的libs中;zookeeper的默认端口号为2181

1 创建节点
通过ZooKeeper的API新增一个znode节点,节点在被创建时,需要制定节点的路径(此处为/root)包含的字节数据,访问权限(如果不想设置,则制定为Ids.OPEN_ACL_UNSAFE),以及创建的节点类型,节点的类型如表所示:

节点类型 解释
CreateMode.PRESISTENT 持久节点,该节点在客户端断开连接后不会删除
CreateMode.EPHEMERAL 临时节点,该节点在客户端断开连接后删除
CreateMode.PERSISTENT_SEQUENTIAL 持久节点,该节点在客户端断开后不会删除,并将在其名下附加一个单调递增数
CreateMode.EPHEMERAL_SEQUENTIAL 临时节点,该节点在客户端断开后删除,并将在其名下附加一个单调递增数

创建节点实例:

//创建/root节点,其包含的数据为“root data",访问权限为开放,所有人均可以访问,创建模式为持久化节点
zooKeeper.create("/root","root data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

2 删除节点
当不需要某个节点,或者某个节点上的信息已经失效时,使用delete方法可以将该节点删除,删除时需要制定节点的版本号version,如果设置为-1,则匹配所有的版本,ZooKeeper会比较删除的节点版本是否和服务器上的版本一致,如果不一致则抛出异常。
删除节点实例:

zooKeeper.delete("/root",-1);

3 设置和获取节点内容
如果想在已有节点中保存数据,可以通过ZooKeeper的setData方法,将数据保存到节点上,也可以通过ZooKeeper的getData方法来获取该节点保存的数据,一个znode中最多能够保存1MB的数据。
设置和获取节点内容的实例。

//设置/root节点的数据,版本号为-1,如果匹配不到相应的节点,会抛出异常
zooKeeper.setData("/root","hello".getBytes(),-1);
//取得/root节点的数据,并反回其stat
Stat stat = new Stat();
byte[] data = zooKeeper.getData("/root",false,stat);

setData方法设置/root节点的数据为“hello”,getData方法取得root节点上保存的节点数据,false表示不使用默认的watcher,第三个参数为Stat, 表示节点的状态,是一个传出的参数,将会返回该节点当前的状态信息。
4 添加子节点
ZooKeeper支持在已有的节点下添加子节点,同样也是用使用create()方法,但是父节点必须存在,否则会跑出异常。
创建子节点的实例:

zooKeeper.create("/root","root data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create("/root/child","child data".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

5 判断节点是否存在
当进行系统初始化时,或者当前需要给一个节点创建子节点时,通常需要判断系统中的一些节点是否存在。

Stat stat = zooKeeper.exists("/root/child1",false);
if(stat==null){
System.out.println("节点不存在");
}else{
System.out.println("节点存在");
}

判断/root/child1节点是否存在,如果存在,返回stat不为null,否则为null.
6 watcher的实现
当节点的状态发生变化,通过watcher机制,可以让客户端得到通知,watcher需要实现org.apache.ZooKeeper.Watcher接口。节点的状态变化主要包含如表所示的几种情况。

节点状态变化 解释
EventType.NodeDeleted 删除节点
EventType.NodeChildrenChanged 修改节点的子节点
EventType.NodeCreated 创建节点
EventType.NodeDataChanged 修改节点数据

watcher的实现实例:

public class ZKWatcher implements Watcher{
@Override public void process(WacthedEvent event){
if(event.getType()==EventType.NodeDeleted)
//del
if(event.getType()==EventType.NodeChildrenChanged )
//del
if(event.getType()==EventType.NodeCreated)
//del
if(event.getType()==EventType.NodeDataChanged  )
//del
}
}

需要注意的是,ZooKeeper的watcher是一次性的,也就是说,每次在处理完状态变化时间之后,需要重新注册watcher,这一点很让人抓狂。这个特性也使得在处理时间和重新加上watcher这段时间发生的节点状态变化将无法被感知。

异常

1 ZooKeeer常常发生下面两种系统异常:
org.apache.ZooKeeper.KeeperException.ConnectionLossException,客户端与其中的一台服务器socket连接出现异常,连接丢失;
org.apache.ZooKeeper.KeeperException.SessionExpiredException, 客户端的session已经超过sessionTimeout,未进行任何操作。
ConnectionLossException异常可以通过重试进行处理,客户端会根据初始化ZooKeeper时传递的服务列表,自动尝试下一个服务端节点,而在这段时间内,服务端节点变更的事件就会丢失。
SessionExpiredException异常不能通过重试解决,需要应用重新创建一个新的客户端(new ZooKeeper()),这时所有的watcher和EPHEMERAL节点都将失效。
一般情况下,不采用原生态的API进行客户端操作,而是采用zkClient或者Curator进行操作,这个会在后续的文章中讲述。
2 异常

2016-08-04 20:12:54 -[INFO] - [Opening socket connection to server 10.101.139.86/10.101.139.86:2181. Will not attempt to authenticate using SASL (unknown error)] - [org.apache.zookeeper.ClientCnxn$SendThread:975]
2016-08-04 20:12:55 -[WARN] - [Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect] - [org.apache.zookeeper.ClientCnxn$SendThread:1102]
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)

原因:
1. 网络不通,端口不可用等;
2. zookeeper服务程序未正常启动;
3. 防火墙未关闭;


Kafka

Kafka的安装与配置

Kafka的下载页面:http://kafka.apache.org/downloads.html。下载之后解压:

[root@hidden util]# tar -zxvf kafka_2.8.9-0.8.1.1
[root@hidden util]# cd kafka_2.8.9-0.8.1.1
[root@hidden util]# ls
bin config  libs    LICENSE logs NOTICE

/bin 启动和停止命令等
/config 配置文件
/libs 类库

启动

在Kafka启动之前需要先启动ZooKeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

之后启动Kafka Server:

bin/kafka-server-start.sh config/server.properties

停止kafka:

bin/kafka-server-stop.sh

停止zookeeper

bin/zookeeper-server-stop.sh

测试

运行producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

运行consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

在producer端输入字符串并回车,查看consumer端是否显示。

关于kafka的介绍以及在java中如何操作kafka会在之后的博文中展开介绍。


参考资料
1. 《大型分布式网站架构设计与实践》陈康贤著。
2. Zookeeper 安装和配置
3. Kafka 安装和测试

上一篇:《c# 实现p2p文件分享与传输系统》 一、 模型


下一篇:灿芯半导体落户合肥 打造集成电路产业园