前言
zk作为分布式服务框架,作为统一命令服务,状态同步服务,集群管理,分布式应用配置管理。通常在服务注册和服务发现,分布式锁,配置管理(1),队列管理(2)。
背景
在我们公司已经需要拆分微服务,因为历史数据表设计问题,导致表中有一个json的字符串的字段,需要修改这段json.但是因为多服务问题,可能会出现多个服务对这个json的字符串处理。会出现分布式数据问题。综上所诉我们这边需要一个分布式锁的中间件来出来,当一个服务在修改时,另一个服务无法修改。从redis,zk中选择zk作为分布式锁中间件。
知识准备
- 架构图
- 永久节点(PERSISTENT)
永久在zk创建节点。客户端即使宕机无法连接,也不会消失。
- 永久有序节点(PERSISTENT_SEQUENTIAL)
下面增加一个节点,可以重复执行,父节点必须以/结尾) 这样会在 /pp/0000000000 /pp/0000000001这种方式不断生成子节点
- 临时节点(EPHEMERAL)
临时在zk创建节点,客户端宕机取消链接就会消失。并且不会有子节点。
- 临时有序节点(EPHEMERAL_SEQUENTIAL)
参考文档
zk集群
集群设立为2n+1,所有节点信息必须先配置在每个服务的zoo.cfg文件中,如果需要添加新的集群follower节点,需要修改配置信息。
选举leader
zk选举leader。选举算法,就是集群中哪个机器处理的数据越新(通过ZXID来比较,ZXID越大,数据越新),其越有可能被选中。选主:第一当启动zk时,或者当主宕机。选主时集群之间相互信息称为投票。ID和ZXID. ID是zk的id,zxid是事物id。首次选举都是选举自己,将自己的id和zxid广播出去。循环等待:每次收取的外部节点Vote信息将自己内部的信息进行pk,选取zxid大的,如果相同则选取id大的。
信息同步
zk之间的信息同步,集群中节点角色确定后,leader会重新加载本地快照及日志文件,以此作为基准数据,再结合各个learner的本地提交数据,leader再确定需要给具体learner回滚哪些数据及同步哪些数据;
数据同步:因为zxid是事物id,最新事物需要更新节点到其他节点中。提出阶段和提交阶段:leader生成提议并广播给followers,收到半数以上的ACK后,再广播commit消息,同时将事务操作应用到内存中。follower收到提议后先将事务写到本地事务日志,然后反馈ACK,等接到leader的commit消息时,才会将事务操作应用到内存中。
CAP理论著名的CAP理论指出,一个分布式系统不可能同时满足C(一致性)、A(可用性)和P(分区容错性)。由于分区容错性在是分布式系统中必须要保证的,因此我们只能在A和C之间进行权衡。在此Zookeeper保证的是CP, 而Eureka则是AP。zk是保证CP的,因为当master宕机时,zk会重新选举,选举时间很长。30-120秒。且选举时整个集群不能使用。且在云部署环境下,因为网络问题导致整个集群失去master的情况很频繁,所以在漫长的选举时间导致这个集群不能使用是不能容忍的。
zk搭建
利用上面的zk安装包,安装到指定的服务器文件内,进入zookeeper文件夹内,建立一个data文件夹作为zk的数据文件,一个log文件夹作为日志文件,进入conf文件夹内 找到zoo_sample.cfg 复制一份成zoo.cfg。cp zoo_sample.cfg zoo.cfg.然后修改配置文件 vim zoo.cfg。dataDir=/root/zookeeper-3.4.10/datadataLogDir=/root/zookeeper-3.4.10/log
将 zookeeper 的根目录设置到系统环境变量 PATH 中.
sudo vi /etc/profile
在打开的 profile 文件末尾追加如下配置:
export ZOOKEEPER_HOME=/root/zookeeper-3.4.10/
export PATH=$ZOOKEEPER_HOME/bin:$PATH
export PATH
刷新 source
source /etc/profile
启动zk,进入bin文件内,zkServer.sh start,查看zk的状态zkServer.sh status.注意:必须先安装jdk ,然后显示mode standalone 就安装成功了。
zk集群搭建
奇数搭建 因为有个过半原则。zk的容错机制:集群中只要有过半的机器是正常工作的,那么整个集群对外就是可用的。也就是说如果有2个zookeeper,那么只要有1个死了 zookeeper就不能用了,因为1没有过半,所以2个zookeeper的死亡容忍度为0;同理,要是有3个zookeeper,一个死了,还剩下2个正常的,过半了,所以3个zookeeper的容忍度为1;同理你多列举几个:2->0;3->1;4->1;5->2;6->2会发现一个规律,2n和2n-1的容忍度是一样的,都是n-1。
zk实现分布式锁
当很多进程需要访问共享资源时,我们可以通过zk来实现分布式锁。主要步骤是:1.建立一个节点,假如名为:lock 。节点类型为持久节点(PERSISTENT)2.每当进程需要访问共享资源时,会调用分布式锁的lock()或tryLock()方法获得锁,这个时候会在第一步创建的lock节点下建立相应的顺序子节点,节点类型为临时顺序节点(EPHEMERAL_SEQUENTIAL),通过组成特定的名字name+lock+顺序号。3.在建立子节点后,对lock下面的所有以name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,假如是最小节点,则获得该锁对资源进行访问。4.假如不是该节点,就获得该节点的上一顺序节点,并给该节点是否存在注册监听事件。同时在这里阻塞。等待监听事件的发生,获得锁控制权。5.当调用完共享资源后,调用unlock()方法,关闭zk,进而可以引发监听事件,释放该锁。实现的分布式锁是严格的按照顺序访问的并发锁。
zk客户端
Curator zk的客户端
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>zk客戶端Curator使用</p>
* @Author: dl.zhang
* @Date: 2019/7/29
*/
@Configuration
@RefreshScope
public class CuratorConfiguration {
@Value("${curator.retryCount}")
private int retryCount;
@Value("${curator.elapsedTimeMs}")
private int elapsedTimeMs;
@Value("${curator.connectString}")
private String connectString;
@Value("${curator.sessionTimeoutMs}")
private int sessionTimeoutMs;
@Value("${curator.connectionTimeoutMs}")
private int connectionTimeoutMs;
@Value( value = "${zk.notice.lock}")
private String noticeLock;
@Value(value = "${zk.notice.children.lock}")
private String noticeChildrenLock;
@Value( value = "${zk.namespace.notice.lock}")
private String namespaceLock;
@Bean(initMethod = "start")
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient(
connectString,
sessionTimeoutMs,
connectionTimeoutMs,
new RetryNTimes(retryCount, elapsedTimeMs));
}
public String getNoticeLock() {
return noticeLock;
}
public void setNoticeLock(String noticeLock) {
this.noticeLock = noticeLock;
}
public String getNoticeChildrenLock() {
return noticeChildrenLock;
}
public void setNoticeChildrenLock(String noticeChildrenLock) {
this.noticeChildrenLock = noticeChildrenLock;
}
public String getNamespaceLock() {
return namespaceLock;
}
public void setNamespaceLock(String namespaceLock) {
this.namespaceLock = namespaceLock;
}
}
zk 客户端服务类
curator提供一下特性
自动化的连接管理: 重新建立到ZooKeeper的连接和重试机制存在一些潜在的错误case。 Curator帮助你处理这些事情
简化了原生的ZooKeeper的方法,事件等.提供了一个现代的流式接口
CuratorFramework框架简介
他是通过CuratorFrameworkFactory以工厂模式和builder模式创建CuratorFramework实例。CuratorFramework是线程安全。
工厂方法newClient()提供了一个简单方式创建实例。 而Builder提供了更多的参数控制。一旦你创建了一个CuratorFramework实例,你必须调用它的start()启动,在应用退出时调用close()方法关闭.
// 折是使用newClient方式
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
// 这是使用build方式
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
// etc. etc.
.build();
CuratorFramework方式示例
方法名 | 描述 |
---|---|
create() | 开始创建操作, 可以调用额外的方法(比如方式mode 或者后台执行background) 并在最后调用forPath()指定要操作的ZNode |
delete() | 开始删除操作. 可以调用额外的方法(版本或者后台处理version or background)并在最后调用forPath()指定要操作的ZNode |
checkExists() | 开始检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath()指定要操作的ZNode |
getData() | 开始获得ZNode节点数据的操作. 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode |
setData() | 开始设置ZNode节点数据的操作. 可以调用额外的方法(版本或者后台处理) 并在最后调用forPath()指定要操作的ZNode |
getChildren() | 开始获得ZNode的子节点列表。 以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode |
inTransaction() | 开始是原子ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交 |
CuratorFramework代码示例
import com.fungo.system.config.CuratorConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
/**
* <p>zk客户端基础服务类</p>
*
* @Author: dl.zhang
* @Date: 2019/7/29
*/
@Component
public class DistributedLockByCurator implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(DistributedLockByCurator.class);
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Autowired
private CuratorFramework curatorFramework;
@Autowired
private CuratorConfiguration curatorConfiguration;
/**
* 获取分布式锁
*/
public void acquireDistributedLock(String path) {
String keyPath = "/" + curatorConfiguration.getNoticeLock() + "/"+curatorConfiguration.getNoticeChildrenLock() + path;
while (true) {
try {
curatorFramework
.create()
.creatingParentsIfNeeded()
.withMode( CreateMode.EPHEMERAL)
.withACL( ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(keyPath);
logger.info("systyem success to acquire lock for path:{}", keyPath);
break;
} catch (Exception e) {
logger.info("systyem failed to acquire lock for path:{}", keyPath);
logger.info("systyem while try again .......");
try {
if (countDownLatch.getCount() <= 0) {
countDownLatch = new CountDownLatch(1);
}
countDownLatch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
/**
* 释放分布式锁
*/
public boolean releaseDistributedLock(String path) {
try {
String keyPath = "/" + curatorConfiguration.getNoticeLock() + "/"+curatorConfiguration.getNoticeChildrenLock() + path;
if (curatorFramework.checkExists().forPath(keyPath) != null) {
curatorFramework.delete().forPath(keyPath);
}
} catch (Exception e) {
logger.error("systyem failed to release lock");
return false;
}
return true;
}
/**
* 创建 watcher 事件
*/
private void addWatcher(String path) throws Exception {
String keyPath;
if (path.equals(curatorConfiguration.getNoticeLock())) {
keyPath = "/" + path;
} else {
keyPath = "/" + curatorConfiguration.getNoticeLock() + "/" + path;
}
final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener((client, event) -> {
if (event.getType().equals( PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String oldPath = event.getData().getPath();
logger.info("监听事件 systyem success to release lock for path:{}", oldPath);
if (oldPath.contains(path)) {
//释放计数器,让当前的请求获取锁
countDownLatch.countDown();
}
}
});
}
//创建父节点,并创建永久节点
@Override
public void afterPropertiesSet() {
curatorFramework = curatorFramework.usingNamespace(curatorConfiguration.getNamespaceLock());
String path = "/" + curatorConfiguration.getNoticeLock();
try {
if (curatorFramework.checkExists().forPath(path) == null) {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path);
}
addWatcher(curatorConfiguration.getNoticeLock());
logger.info("systyem root path 的 watcher 事件创建成功");
} catch (Exception e) {
logger.error("systyem connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
}
}
}
// 开启分布式锁
distributedLockByCurator.acquireDistributedLock( s.getMemberId() );
// 关闭分布式锁
distributedLockByCurator.releaseDistributedLock( s.getMemberId() );
备注
- (1) 配置管理 : 程序分散部署在多台机器上,要逐个改变配置就变得困难。好吧,现在把这些配置全部放到zookeeper上去,保存在 Zookeeper 的某个目录节点中,然后所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中就好。
- (2) 队列管理: 两种类型的队列.1、 同步队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。2、队列按照 FIFO 方式进行入队和出队操作。第一类,在约定目录下创建临时目录节点,监听节点数目是否是我们要求的数目。第二类,和分布式锁服务中的控制时序场景基本原理一致,入列有编号,出列按编号。