zookeeper是如何保证创建的节点是唯一的?

目录

一、问题背景

二、zookeeper源码级保证原子性

三、java源码级CAS

四、汇编级别CAS

五、操作系统(处理器)级别的CAS


一、问题背景

zookeeper是一个分布式协调服务,可以保证数据的一致性。由于所有的写请求都会被Follower节点转发到Leader节点执行,创建节点的请求也是一样的,所以只会由Leader节点创建新的节点,然后把数据同步到其他Follower节点。那么,它是如何保证创建的节点是唯一的呢?

二、zookeeper源码级保证原子性

zookeeper创建节点是由DataTree的createNode方法来执行的。

public String createNode(String path, byte data[], List<ACL> acl,long ephemeralOwner, int parentCVersion, long zxid, long time)
            throws KeeperException.NoNodeException,
            KeeperException.NodeExistsException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        StatPersisted stat = new StatPersisted();
        stat.setCtime(time);
        stat.setMtime(time);
        stat.setCzxid(zxid);
        stat.setMzxid(zxid);
        stat.setPzxid(zxid);
        stat.setVersion(0);
        stat.setAversion(0);
        stat.setEphemeralOwner(ephemeralOwner);
        DataNode parent = nodes.get(parentName);
        if (parent == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            Set<String> children = parent.getChildren();
            if (children != null) {
                if (children.contains(childName)) {
                    throw new KeeperException.NodeExistsException();
                }
            }
            
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                parentCVersion++;
            }    
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
            Long longval = convertAcls(acl);
            DataNode child = new DataNode(parent, data, longval, stat);
            parent.addChild(childName);
            nodes.put(path, child);
            if (ephemeralOwner != 0) {
                HashSet<String> list = ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(ephemeralOwner, list);
                }
                synchronized (list) {
                    list.add(path);
                }
            }
        }
        // now check if its one of the zookeeper node child
        if (parentName.startsWith(quotaZookeeper)) {
            // now check if its the limit node
            if (Quotas.limitNode.equals(childName)) {
                // this is the limit node
                // get the parent and add it to the trie
                pTrie.addPath(parentName.substring(quotaZookeeper.length()));
            }
            if (Quotas.statNode.equals(childName)) {
                updateQuotaForPath(parentName
                        .substring(quotaZookeeper.length()));
            }
        }
        // also check to update the quotas for this node
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
            // ok we have some match and need to update
            updateCount(lastPrefix, 1);
            updateBytes(lastPrefix, data == null ? 0 : data.length);
        }
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
        return path;
    }

createNode方法逻辑:

首先,获取到所要创建的新节点的name和其父节点的parentName,创建一个StatPersisted对象来存储新节点的一些状态属性信息;

其次,从名为nodes的ConcurrentHashMap(用于存放所有的节点,key是path字符串,value是DataNode对象)中获取到父节点,然后用synchronized锁住父节点,避免其他请求出现并发问题;

最后,在获取到父节点之后,会将“新节点的路径”添加到“父节点的children的Set集合”中,同时将新节点放到nodes的ConcurrentHashMap中。

三、java源码级CAS

到此为止,所有的工作交给了ConcurrentHashMap的put方法。现在来看一下ConcurrentHashMap的put方法是如何保证只创建一个节点的。
这里简单了解一下ConcurrentHashMap有关的知识。ConcurrentHashMap是在HashMap的基础上实现的,HashMap与ConcurrentHashMap的区别是:ConcurrentHashMap在并发的情况下保证了线程安全,两者的底层数据结构都是一样的,但是在不同的JDK版本中具体实现是不一样的,在JDK7及以前是数组+链表实现的,在JDK8以后是数组+链表+红黑树(链表长度到达8以后,自动扩展为红黑树)实现的。

ConcurrentHashMap的put方法在JDK8中是通过CAS+Synchronized取代Segment+ReentrantLock来实现的,这是一个很重要的知识点,这里暂时不展开来说。ConcurrentHashMap的put方法是先利用自旋锁+Synchronized来table操作数组,然后再用CAS的方式写入到内存中去。
 

public V put(K key, V value) {
        return putVal(key, value, false);
 }
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

put方法中的CAS操作是调用UnSafe类的方法来实现的,UnSafe看不到源码,需要下载Openjdk的源码来看。看了UnSafe源码之后,我们会发现它的方法都是native方法,这是因为Java不能直接访问操作系统底层,是通过本地方法来访问,而Unsafe类的native方法提供了硬件级别的原子操作,native方法是通过c++代码来实现的。

static inline bool compareAndSwap (volatile jint *addr, jint old, jint new_val)
 {
  jboolean result = false;
  spinlock lock;
   if ((result = (*addr == old)))
     *addr = new_val;
   return result;
 }

四、汇编级别CAS

以下是几个和原子操作有关的汇编指令(所涉及的汇编知识不详细解释,只需要知道通过汇编语言可以实现原子操作即可):

指令     作用
XADD 先交换两个操作数的值,再进行算术加法操作。多处理器安全,在80486及以上CPU中支持。
CMPXCHG 比较交换指令,第一操作数先和AL/AX/EAX比较,如果相等ZF置1,第二操作数赋给第一操作数,否则ZF清0,第一操作数赋给AL/AX/EAX。多处理器安全,在80486及以上CPU中支持。
XCHG 交换两个操作数,其中至少有一个是寄存器寻址.其他寄存器和标志位不受影响。
LOCK

这是一个指令前缀,在所对应的指令操作期间使此指令的目标操作数指定的存储区域锁定,以得到保护。

当今,几乎100%CPU都支持这些指令。因此,用标准C和C++可以写出一系列几乎可以跨平台的原子操作函数。

五、操作系统(处理器)级别的CAS

操作系统底层(处理器)是如何保证原子操作的呢?
首先,处理器能自动保证基本的内存操作是原子性的。当一个处理器读取一个字节时,其他处理器不能访问这个字节的内存地址。但是,对于复杂的内存操作,处理器是不能自动保证其原子性的,比如跨总线宽度、跨多个缓存行和跨页表的访问。但是,处理器提供了总线锁定和缓存锁定两个机制,来保证复杂内存操作的原子性。

总线锁定机制:通过总线锁保证原子性。如果多个处理器同时对共享变量进行读改写操作(i++就是经典的读改写操作),那么共享变量就会被多个处理器同时进行操作。这样,读改写操作就不是原子的,操作完之后共享变量的值会和期望的不一致。最简单的就是i++问题,当两个处理器CPU1和CPU2处理变量i的时候,想要保证读改写共享变量的操作是原子的,就必须保证CPU1读改写共享变量的时候,CPU2不能操作“缓存了该共享变量内存地址的缓存”。处理器使用总线锁就是来解决这个问题的。所谓总线锁,就是使用处理器提供的一个LOCLK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。

缓存锁定机制:通过缓存锁定来保证原子性。在同一时刻,只需保证对某个内存地址的操作是原子性即可,但总线锁定把CPU和内存之间的通信锁住了,这使得在锁定期间,其它处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大,目前,处理器在某些场合下使用“缓存锁定”代替“总线锁定”来进行优化。频繁使用的内存会缓存在处理器的L1、L2和L3高速缓存里,原子操作就可以直接在处理器内部的缓存中进行,并不需要声明总线锁。所谓“缓存锁定”,是指如果共享内存被缓存在处理器的缓存行中,并且在Lock操作期间被锁定,那么当它执行锁操作回写到内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性。因为缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行的数据时,会使缓存行无效。

针对以上两个机制,我们可以通过“Intel处理器提供的很多Lock前缀的指令”来实现。例如,位测试和修改指令:BTS、BTR、BTC;交换指令XADD、CMPXCHG,以及其他一些操作数和逻辑指令。被这些指令操作的内存区域就会加锁,导致其他处理器不能同时访问它。

到此为止,我们探究了“zookeeper创建的节点是唯一的”原理。有些知识点没有展开讲,可能是没有必要,或者是本人能力有限的原因,欢迎大家指正批评!
 

上一篇:stat命令的实现


下一篇:SpringBoot项目运行正常但是打包报错[ERROR] Failed to execute goal org.springframework.boot:spring-boot-maven-plug