NameNode 用了什么神秘技术来支撑元数据百万并发读写的

本文大纲

NameNode 用了什么神秘技术来支撑元数据百万并发读写的

一、HDFS 是大数据的基石

我们都知道,HDFS 是大数据存储的基石,所有的离线数据都存储在 HDFS 上,而 NameNode 是存储所有元数据的地方(所谓元数据就是描述数据的数据,比如文件的大小,文件都存储在哪些 DataNode 上,文件在目录树的位置等),所以 NameNode 便成为了 HDFS 最关键的部分。在离线数仓中,会存在很多离线任务,这些离线任务都要往 HDFS 写数据,每次写数据都会经过 NameNode 来保存元数据信息,那么 NameNode 势必会承担非常多的请求。NameNode 作为 HDFS 的核心,肯定自身要保证高可用,数据不能一直在内存中,要写到磁盘里。所以一个关键的问题来了,NameNode 是用了什么神秘的技术,在保证自身高可用的同时,还能承担巨额的读写请求?

二、NameNode 高可用是如何实现的

下面直接来一个 NameNode 高可用的架构图:

NameNode 用了什么神秘技术来支撑元数据百万并发读写的

然后解释下如何保证高可用的:

(1)NameNode 只有一个时候的单点故障问题

如果我们只部署了一个 NameNode,那么这个 NameNode 是有单点故障的问题的。如何解决,再加一个 NameNode 即可;

(2)当有两个 NameNode ,切换时,数据如何保持同步

两个 NameNode 一起工作,某一个 NameNode 挂掉了,另一个 NameNode 接替工作,这件事成立的必要前提是,两个 NameNode 的数据得时时刻刻保持一致。

那么如何保持数据一致,是不是可以在两个 NameNode 之间搞个共享的文件系统?仔细想想也不行,这样的话,单点故障问题就转移到这个文件系统上了。

(3)使用多节点的 JournalNode 作为主备 NameNode 的数据同步介质

这里引入了 JournalNode 集群,JournalNode 的每个节点的数据都是一样的,并且时刻保持一致。并且只要超过半数的节点存活,整个 JournalNode 集群都可以正常提供服务。

所以,一般会使用奇数个节点来搭建。(为什么一般不用偶数个呢?因为 3 个节点构成的集群,可以容忍挂掉一台机器;而 4 个节点构成的集群,也只能容忍挂掉一台机器。同样是只能挂掉一台,为何不选 3 个节点的呢,还能节省资源)。

使用 JournalNode 集群,一个 NameNode 实时的往集群写,另一个 NameNode 也实时的读集群数据,这样两个 NameNode 数据就可以保持一致了。

(4)一个 NameNode 挂掉,另一个 NameNode 如何立马感知并接替工作

首先不能人工参与切换。那如何实时监听呢?

首先要再引入一个关键组件:Zookeeper。当两个 NameNode 同时启动后,他们都会去 Zookeeper 上注册,谁注册成功了,谁就能获得锁,成为 Active 状态的 NameNode。

另外还需要一个组件:ZKFC,它会实时监控 Zookeeper 进程状态,并且会以心跳的方式实时的告诉 Zookeeper NameNode 的状态。如果一个 NameNode 进程挂了,就会把 Zookeeper 上的锁给另外一个 NameNode,让它成为 Active 的来工作。

三、NameNode 如何既高可用,还能高并发

1、双缓冲技术

NameNode 为了实现高可用,首先自己内存里的数据需要写到磁盘,然后还需要往 JournalNode 里写数据。

所以既然要写磁盘,还是往两个地方写磁盘,那必然性能会跟不上的。

所以这里 NameNode 引入了一个技术,也是本篇文章的重点:双缓冲技术。

双缓冲的设计理念如下图:

NameNode 用了什么神秘技术来支撑元数据百万并发读写的

客户端不是直接写磁盘,而是往一个内存结构(Buffer1)里面写数据。当 Buffer1 达到一定阈值后,Buffer 1 和 Buffer 2 交换内存数据。此时 Buffer1 数据为空,Buffer2 开始在后台默默写磁盘。

这样的好处很明显的,前端只需要进行内存写 Buffer1 就行,性能特别高;而 Buffer2 在后台默默的同步日志到磁盘即可。

这样磁盘写,就转化成为了内存写,速度大大提高了。

2、如何实现双缓冲

然而,在真实环境不只一个客户端是这样子的:

NameNode 用了什么神秘技术来支撑元数据百万并发读写的

大数据情况下是 N 个客户端同时并发写的,在高并发的情况下,我们必然要去协调多个线程动作的一致性,比如往 Buffer1 的写动作,Buffer1 与 Buffer2 数据交换的动作,Buffer2 写磁盘的动作。

那么我们该如何实现这样一个巧妙的双缓冲呢?下面的代码是我从 Hadoop 源码里抽离出来的关键实现:

package org.apache.hadoop;

import java.util.LinkedList;

public class FSEditLog2 {
    private long txid=0L;
    private DoubleBuffer editLogBuffer=new DoubleBuffer();
    //是否正在刷写磁盘
    private volatile Boolean isSyncRunning = false;
    private volatile Boolean isWaitSync = false;

    private volatile Long syncMaxTxid = 0L;

    //每个线程都对应自己的一个副本
    private ThreadLocal<Long> localTxid=new ThreadLocal<Long>();

    public void logEdit(String content){//mkdir /a
        synchronized (this){//加锁的目的就是为了事务ID的唯一,而且是递增
            txid++;
            localTxid.set(txid);
            EditLog log = new EditLog(txid, content);
            editLogBuffer.write(log);
        }
        logSync();
    }

    private  void logSync(){
        synchronized (this){
            if(isSyncRunning){ //是否有人正在把数据同步到磁盘上面
                long txid = localTxid.get();
                if(txid <= syncMaxTxid){
                    //直接return,不接着干了?
                    return;
                }
                if(isWaitSync){
                    return;
                }
                isWaitSync = true;

                while(isSyncRunning){
                    try {
                        wait(2000);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
                isWaitSync = false;
            }

            editLogBuffer.setReadyToSync();
            if(editLogBuffer.syncBuffer.size() > 0) {
                syncMaxTxid = editLogBuffer.getSyncMaxTxid();
            }

            isSyncRunning = true;

        } //释放锁

        editLogBuffer.flush();
        synchronized (this) {
            isSyncRunning = false;
            notify();
        } //释放锁
    }


    /**
     * 把日志抽象成类
     */
    class EditLog{
        //顺序递增
        long txid;
        //操作内容  mkdir /a
        String content;

        //构造函数
        public EditLog(long txid,String content){
            this.txid = txid;
            this.content = content;
        }

        //为了测试方便
        @Override
        public String toString() {
            return "EditLog{" +
                    "txid=" + txid +
                    ", content='" + content + '\'' +
                    '}';
        }
    }


    /**
     * 双缓存方案
     */
    class DoubleBuffer{
        //内存1
        LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
        //内存2
        LinkedList<EditLog> syncBuffer= new LinkedList<EditLog>();

        /**
         * 把数据写到当前内存1
         * @param log
         */
        public void write(EditLog log){
            currentBuffer.add(log);
        }

        /**
         * 交换内存
         */
        public void setReadyToSync(){
            LinkedList<EditLog> tmp= currentBuffer;
            currentBuffer = syncBuffer;
            syncBuffer = tmp;
        }

        /**
         * 获取内存2里面的日志的最大的事务编号
         * @return
         */
        public Long getSyncMaxTxid(){
            return syncBuffer.getLast().txid;
        }


        /**
         * 刷写磁盘
          */
        public void flush(){
            for(EditLog log:syncBuffer){
                //把数据写到磁盘上
                System.out.println("存入磁盘日志信息:"+log);
            }

            //把内存2里面的数据要清空
            syncBuffer.clear();
        }
    }
}

主要的业务逻辑就是 40 行,但是真的很巧妙。

1、EditLog

我们先看这个 EditLog 内部类,这是对 EditLog 日志的一个封装,就两个属性 txid 和 content,分别是日志的事务id(保证唯一性)和 内容。

2、DoubleBuffer

再看这个 DoubleBuffer 双缓冲类,很简单,就是在内存里面维护了两个有序的 LinkedList,分别是当前写编辑日志的缓冲和同步到磁盘的缓冲,其中的元素就是 EditLog 类。

write 方法就是把一条编辑日志写到当前缓冲里。

setReadyToSync 方法,就是交换两个缓冲,也是最简单的刚学习 Java 就学习过的两个变量交换值的方法。

getSyncMaxTxid 方法,获得正在同步的那个缓冲区里的最大的事务id。

flush 方法,遍历同步的缓冲的每一条编辑日志,写到磁盘,并最终清空缓冲区内容。

3、主类的一些属性说明

(1)全局的事务id

private long txid=0L;

(2)双缓冲结构

private DoubleBuffer editLogBuffer=new DoubleBuffer();

(3)控制变量

private volatile Boolean isSyncRunning = false; // 是否正在同步数据到磁盘

private volatile Boolean isWaitSync = false; // 是否有线程在等待同步数据到磁盘完成

private volatile Long syncMaxTxid = 0L; // 当前同步的最大日志事务id

private ThreadLocallocalTxid=new ThreadLocal();  // 每个线程的线程副本,用来放本线程当前写入的日志事务id

(4)主逻辑 logEdit 方法

这个方法是对外暴露的方法,客户端往双缓冲写数据就是用的这个方法。

假设当前有一个线程1 进到了 logEdit 方法,首先直接把当前类实例加锁,避免别的线程进来,以此来保证编辑日志事务id的唯一自增性。

// 全局事务递增
txid++;
// 往线程本身的变量里设置事务id值
localTxid.set(txid);
// 构造 EditLog 变量
EditLog log = new EditLog(txid, content);
// 写入当前的 Buffer
editLogBuffer.write(log);

当它执行完了这些之后,释放锁,开始执行 logSync() 方法。此时由于释放了锁,于是很多线程开始拿到锁,进入了这个方法中。

假设有五个线程进来了分别写了一条日志,于是现在双缓冲是这样子的:

NameNode 用了什么神秘技术来支撑元数据百万并发读写的

好,然后线程1 开始进入 logSync 方法,第一步就是使用当前类的实例加了锁,保证只有一个线程进来。

检查 isSyncRunning 变量是否为 true,目前是 false,跳过这个方法。

开始执行这个 editLogBuffer.setReadyToSync(); 方法,于是双缓冲的数据直接被交换了。

NameNode 用了什么神秘技术来支撑元数据百万并发读写的

然后获得了全局最大的id,当前是 5,赋值给了 syncMaxTxid 变量

if(editLogBuffer.syncBuffer.size() > 0) {
    syncMaxTxid = editLogBuffer.getSyncMaxTxid();
}

然后 isSyncRunning = true;  把这个变量置为 true,表示正在同步数据到磁盘。此时释放锁。

然后 线程 1 开始执行数据同步到磁盘的动作:editLogBuffer.flush() ,这个动作肯定耗费的时间比较久,基本是在 ms 级别。

此时我们假设 线程2 争抢到了锁,进入到了 logSync 方法。

// 线程2 判断 是否有人正在把数据同步到磁盘上面,这个值被线程 1 改为 true 了
// 进入到 if 方法
if(isSyncRunning){ 
    // 获得到自己线程的事务id,为 2
    long txid = localTxid.get();
    // 2 是否小于 5 ?小于,直接返回,因为此时 5 已经正在被同步到磁盘了
    if(txid <= syncMaxTxid){
        return;
    }
    if(isWaitSync){
        return;
    }
    isWaitSync = true;

    while(isSyncRunning){
        try {
            wait(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    isWaitSync = false;
}

线程2 由于自身的编辑日志的事务id 小于当前正在同步的最大的事务id,所以直接返回了,然后线程3 ,线程4,线程5 进来都是这样,直接 return 返回。

假设线程6 此时进来,当前双缓冲状态是这样的

NameNode 用了什么神秘技术来支撑元数据百万并发读写的

下面线程 6 干的活,参考下面代码里的注释:

// 线程6 判断是否有人正在把数据同步到磁盘上面,这个值被线程 1 改为 true 了
// 进入到 if 方法
if(isSyncRunning){ 
    // 获得到自己线程的事务id,为 6
    long txid = localTxid.get();
    // 6 是否小于 5 ,不小于继续执行
    if(txid <= syncMaxTxid){
        return;
    }
    // 这个值为 false,继续执行
    if(isWaitSync){
        return;
    }
    // 把 isWaitSync 设置为 true
    isWaitSync = true;

    // 这个值被线程1置为了 true,所以这里在死循环
    while(isSyncRunning){
        try {
            // 等待 2s,wait 会释放锁,同时线程 6 进入睡眠中
            wait(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    isWaitSync = false;
}

可以看到 线程 6 在 while 循环里无限等待数据同步到磁盘完毕。然后由于线程 6 把 isWaitSync 值改为了 true,线程 6 在等待期间释放锁,被其他线程抢到之后,其他线程由于 isWaitSync 为true,直接返回了。

当过了一会儿,线程1 把第二个 Buffer 同步到磁盘完毕后,线程1 会执行这些代码

synchronized (this) {
    isSyncRunning = false;
    notify();
} //释放锁

把 isSyncRunning 变量置为 false,同时调用 notify(),通知线程 6 ,你可以继续参与锁的竞争了。

然后线程6 ,从 wait 中醒来,重新参与锁竞争,继续执行接下来的代码。此时 isSyncRunning 已经为 false,所以它跳出了 while 循环,把 isWaitSync 置为了 false。

然后它开始执行:交换缓冲区,给最大的事务id(此时为6 )赋值,把 isSyncRunning 赋值为 true。

editLogBuffer.setReadyToSync();
if(editLogBuffer.syncBuffer.size() > 0) {
    syncMaxTxid = editLogBuffer.getSyncMaxTxid();
}

isSyncRunning = true;

执行完了之后,释放锁,开始执行Buffer2 的同步。然后所有的线程就按照上面的方式有序的工作。

这段几十行的代码很精炼,值得反复推敲,总结下来如下:

(1)写缓冲到内存 和 同步数据到磁盘分开,互不影响和干扰;

(2)使用 synchronize ,wait 和 notify 来保证多线程有序进行工作;

(3)当在同步数据到磁盘中的时候,其他争抢到锁进来准备同步数据的线程只能等待;

(4)线程使用 ThreadLocal 变量,来记录自身当前的事务id,如果小于当前正在同步的最大事务id,则不同步;

(5)有线程在等待同步数据的时候,其他线程写完 editLog 到内存后直接返回;

四、最后的总结

本文详细探讨了 HDFS 在大数据中基石的地位,以及如何保障 NameNode 高可用的运行。

NameNode 在高可用运行时,同时是如何保证高并发读写操作的。双缓冲在其中起到了核心的作用,把写数据和同步数据到磁盘分离开,互不影响。

同时我们还剥离了一段核心双缓冲的实现代码,仔细分析了实现原理。这短短的几十行代码,可谓综合利用了多线程高并发的知识,耐人寻味。

 

 

NameNode 用了什么神秘技术来支撑元数据百万并发读写的

上一篇:Hadoop系列


下一篇:hdfs基础