一周一个中间件-zookeeper

前言

zk作为分布式服务框架,作为统一命令服务,状态同步服务,集群管理,分布式应用配置管理。通常在服务注册和服务发现,分布式锁,配置管理(1),队列管理(2)。

背景

在我们公司已经需要拆分微服务,因为历史数据表设计问题,导致表中有一个json的字符串的字段,需要修改这段json.但是因为多服务问题,可能会出现多个服务对这个json的字符串处理。会出现分布式数据问题。综上所诉我们这边需要一个分布式锁的中间件来出来,当一个服务在修改时,另一个服务无法修改。从redis,zk中选择zk作为分布式锁中间件。

知识准备

  • 架构图

一周一个中间件-zookeeper

  • 永久节点(PERSISTENT)

永久在zk创建节点。客户端即使宕机无法连接,也不会消失。

  • 永久有序节点(PERSISTENT_SEQUENTIAL)

下面增加一个节点,可以重复执行,父节点必须以/结尾) 这样会在 /pp/0000000000 /pp/0000000001这种方式不断生成子节点

  • 临时节点(EPHEMERAL)

临时在zk创建节点,客户端宕机取消链接就会消失。并且不会有子节点。

  • 临时有序节点(EPHEMERAL_SEQUENTIAL)

参考文档

zookeeper 下载  提取码:0zez
从PAXOS到ZOOKEEPER分布式一致性原理与实践 提取码:wkjg

zk集群

集群设立为2n+1,所有节点信息必须先配置在每个服务的zoo.cfg文件中,如果需要添加新的集群follower节点,需要修改配置信息。

选举leader

zk选举leader。选举算法,就是集群中哪个机器处理的数据越新(通过ZXID来比较,ZXID越大,数据越新),其越有可能被选中。选主:第一当启动zk时,或者当主宕机。选主时集群之间相互信息称为投票。ID和ZXID. ID是zk的id,zxid是事物id。首次选举都是选举自己,将自己的id和zxid广播出去。循环等待:每次收取的外部节点Vote信息将自己内部的信息进行pk,选取zxid大的,如果相同则选取id大的。

信息同步

zk之间的信息同步,集群中节点角色确定后,leader会重新加载本地快照及日志文件,以此作为基准数据,再结合各个learner的本地提交数据,leader再确定需要给具体learner回滚哪些数据及同步哪些数据;
数据同步:因为zxid是事物id,最新事物需要更新节点到其他节点中。提出阶段和提交阶段:leader生成提议并广播给followers,收到半数以上的ACK后,再广播commit消息,同时将事务操作应用到内存中。follower收到提议后先将事务写到本地事务日志,然后反馈ACK,等接到leader的commit消息时,才会将事务操作应用到内存中。

CAP理论著名的CAP理论指出,一个分布式系统不可能同时满足C(一致性)、A(可用性)和P(分区容错性)。由于分区容错性在是分布式系统中必须要保证的,因此我们只能在A和C之间进行权衡。在此Zookeeper保证的是CP, 而Eureka则是AP。zk是保证CP的,因为当master宕机时,zk会重新选举,选举时间很长。30-120秒。且选举时整个集群不能使用。且在云部署环境下,因为网络问题导致整个集群失去master的情况很频繁,所以在漫长的选举时间导致这个集群不能使用是不能容忍的。

zk搭建

利用上面的zk安装包,安装到指定的服务器文件内,进入zookeeper文件夹内,建立一个data文件夹作为zk的数据文件,一个log文件夹作为日志文件,进入conf文件夹内 找到zoo_sample.cfg 复制一份成zoo.cfg。cp zoo_sample.cfg zoo.cfg.然后修改配置文件 vim zoo.cfg。dataDir=/root/zookeeper-3.4.10/datadataLogDir=/root/zookeeper-3.4.10/log

将 zookeeper 的根目录设置到系统环境变量 PATH 中.

sudo vi /etc/profile
在打开的 profile 文件末尾追加如下配置:
export ZOOKEEPER_HOME=/root/zookeeper-3.4.10/
export PATH=$ZOOKEEPER_HOME/bin:$PATH
export PATH
刷新 source 
source /etc/profile

启动zk,进入bin文件内,zkServer.sh start,查看zk的状态zkServer.sh status.注意:必须先安装jdk ,然后显示mode standalone 就安装成功了。

zk集群搭建

奇数搭建 因为有个过半原则。zk的容错机制:集群中只要有过半的机器是正常工作的,那么整个集群对外就是可用的。也就是说如果有2个zookeeper,那么只要有1个死了 zookeeper就不能用了,因为1没有过半,所以2个zookeeper的死亡容忍度为0;同理,要是有3个zookeeper,一个死了,还剩下2个正常的,过半了,所以3个zookeeper的容忍度为1;同理你多列举几个:2->0;3->1;4->1;5->2;6->2会发现一个规律,2n和2n-1的容忍度是一样的,都是n-1。

zk实现分布式锁

当很多进程需要访问共享资源时,我们可以通过zk来实现分布式锁。主要步骤是:1.建立一个节点,假如名为:lock 。节点类型为持久节点(PERSISTENT)2.每当进程需要访问共享资源时,会调用分布式锁的lock()或tryLock()方法获得锁,这个时候会在第一步创建的lock节点下建立相应的顺序子节点,节点类型为临时顺序节点(EPHEMERAL_SEQUENTIAL),通过组成特定的名字name+lock+顺序号。3.在建立子节点后,对lock下面的所有以name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,假如是最小节点,则获得该锁对资源进行访问。4.假如不是该节点,就获得该节点的上一顺序节点,并给该节点是否存在注册监听事件。同时在这里阻塞。等待监听事件的发生,获得锁控制权。5.当调用完共享资源后,调用unlock()方法,关闭zk,进而可以引发监听事件,释放该锁。实现的分布式锁是严格的按照顺序访问的并发锁。

zk客户端

Curator zk的客户端

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <p>zk客戶端Curator使用</p>
 * @Author: dl.zhang
 * @Date: 2019/7/29
 */
@Configuration
@RefreshScope
public class CuratorConfiguration {
    @Value("${curator.retryCount}")
    private int retryCount;
    @Value("${curator.elapsedTimeMs}")
    private int elapsedTimeMs;
    @Value("${curator.connectString}")
    private String connectString;
    @Value("${curator.sessionTimeoutMs}")
    private int sessionTimeoutMs;
    @Value("${curator.connectionTimeoutMs}")
    private int connectionTimeoutMs;
    @Value( value = "${zk.notice.lock}")
    private String  noticeLock;
    @Value(value = "${zk.notice.children.lock}")
    private String noticeChildrenLock;
    @Value( value = "${zk.namespace.notice.lock}")
    private String namespaceLock;
    @Bean(initMethod = "start")
    public CuratorFramework curatorFramework() {
        return CuratorFrameworkFactory.newClient(
                connectString,
                sessionTimeoutMs,
                connectionTimeoutMs,
                new RetryNTimes(retryCount, elapsedTimeMs));
    }
    public String getNoticeLock() {
        return noticeLock;
    }
    public void setNoticeLock(String noticeLock) {
        this.noticeLock = noticeLock;
    }
    public String getNoticeChildrenLock() {
        return noticeChildrenLock;
    }
    public void setNoticeChildrenLock(String noticeChildrenLock) {
        this.noticeChildrenLock = noticeChildrenLock;
    }
    public String getNamespaceLock() {
        return namespaceLock;
    }
    public void setNamespaceLock(String namespaceLock) {
        this.namespaceLock = namespaceLock;
    }
}

zk 客户端服务类
curator提供一下特性
自动化的连接管理: 重新建立到ZooKeeper的连接和重试机制存在一些潜在的错误case。 Curator帮助你处理这些事情
简化了原生的ZooKeeper的方法,事件等.提供了一个现代的流式接口
CuratorFramework框架简介
他是通过CuratorFrameworkFactory以工厂模式和builder模式创建CuratorFramework实例。CuratorFramework是线程安全。
工厂方法newClient()提供了一个简单方式创建实例。 而Builder提供了更多的参数控制。一旦你创建了一个CuratorFramework实例,你必须调用它的start()启动,在应用退出时调用close()方法关闭.

// 折是使用newClient方式
 ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
 CuratorFramework  curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
// 这是使用build方式
 CuratorFramework  curatorFramework =  CuratorFrameworkFactory.builder().connectString(connectionString)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                // etc. etc.
                .build();

CuratorFramework方式示例

方法名 描述
create() 开始创建操作, 可以调用额外的方法(比如方式mode 或者后台执行background) 并在最后调用forPath()指定要操作的ZNode
delete() 开始删除操作. 可以调用额外的方法(版本或者后台处理version or background)并在最后调用forPath()指定要操作的ZNode
checkExists() 开始检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath()指定要操作的ZNode
getData() 开始获得ZNode节点数据的操作. 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode
setData() 开始设置ZNode节点数据的操作. 可以调用额外的方法(版本或者后台处理) 并在最后调用forPath()指定要操作的ZNode
getChildren() 开始获得ZNode的子节点列表。 以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode
inTransaction() 开始是原子ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交

CuratorFramework代码示例

import com.fungo.system.config.CuratorConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

/**
 * <p>zk客户端基础服务类</p>
 *
 * @Author: dl.zhang
 * @Date: 2019/7/29
 */
@Component
public class DistributedLockByCurator implements InitializingBean {

    private static final Logger logger = LoggerFactory.getLogger(DistributedLockByCurator.class);
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    @Autowired
    private CuratorFramework curatorFramework;
    @Autowired
    private CuratorConfiguration curatorConfiguration;

    /**
     * 获取分布式锁
     */
    public void acquireDistributedLock(String path) {
        String keyPath = "/" + curatorConfiguration.getNoticeLock() + "/"+curatorConfiguration.getNoticeChildrenLock() + path;
        while (true) {
            try {
                curatorFramework
                        .create()
                        .creatingParentsIfNeeded()
                        .withMode( CreateMode.EPHEMERAL)
                        .withACL( ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(keyPath);
                logger.info("systyem success to acquire lock for path:{}", keyPath);
                break;
            } catch (Exception e) {
                logger.info("systyem failed to acquire lock for path:{}", keyPath);
                logger.info("systyem while try again .......");
                try {
                    if (countDownLatch.getCount() <= 0) {
                        countDownLatch = new CountDownLatch(1);
                    }
                    countDownLatch.await();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    /**
     * 释放分布式锁
     */
    public boolean releaseDistributedLock(String path) {
        try {
            String keyPath = "/" + curatorConfiguration.getNoticeLock() + "/"+curatorConfiguration.getNoticeChildrenLock() + path;
            if (curatorFramework.checkExists().forPath(keyPath) != null) {
                curatorFramework.delete().forPath(keyPath);
            }
        } catch (Exception e) {
            logger.error("systyem failed to release lock");
            return false;
        }
        return true;
    }

    /**
     * 创建 watcher 事件
     */
    private void addWatcher(String path) throws Exception {
        String keyPath;
        if (path.equals(curatorConfiguration.getNoticeLock())) {
            keyPath = "/" + path;
        } else {
            keyPath = "/" + curatorConfiguration.getNoticeLock() + "/" + path;
        }
        final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener((client, event) -> {
            if (event.getType().equals( PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                String oldPath = event.getData().getPath();
                logger.info("监听事件 systyem success to release lock for path:{}", oldPath);
                if (oldPath.contains(path)) {
                    //释放计数器,让当前的请求获取锁
                    countDownLatch.countDown();
                }
            }
        });
    }

    //创建父节点,并创建永久节点
    @Override
    public void afterPropertiesSet() {
        curatorFramework = curatorFramework.usingNamespace(curatorConfiguration.getNamespaceLock());
        String path = "/" + curatorConfiguration.getNoticeLock();
        try {
            if (curatorFramework.checkExists().forPath(path) == null) {
                curatorFramework.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(path);
            }
            addWatcher(curatorConfiguration.getNoticeLock());
            logger.info("systyem root path 的 watcher 事件创建成功");
        } catch (Exception e) {
            logger.error("systyem connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
        }
    }
}
// 开启分布式锁
distributedLockByCurator.acquireDistributedLock( s.getMemberId() );
// 关闭分布式锁
distributedLockByCurator.releaseDistributedLock( s.getMemberId() );

备注

  • (1) 配置管理 : 程序分散部署在多台机器上,要逐个改变配置就变得困难。好吧,现在把这些配置全部放到zookeeper上去,保存在 Zookeeper 的某个目录节点中,然后所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中就好。
    一周一个中间件-zookeeper
  • (2) 队列管理: 两种类型的队列.1、 同步队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。2、队列按照 FIFO 方式进行入队和出队操作。第一类,在约定目录下创建临时目录节点,监听节点数目是否是我们要求的数目。第二类,和分布式锁服务中的控制时序场景基本原理一致,入列有编号,出列按编号。
上一篇:IO面试题


下一篇:十九、函数参数话