分布式唯一ID生成(二): leaf-雪花算法

号段模式的ID很接近严格递增,如果在订单场景,可以根据ID猜到一天的订单量。此时就可以用雪花算法模式

leaf在每一位的分配和标准snowflake一致:
在这里插入图片描述

  • 最高位符号位为0

  • 接下来41位:毫秒级时间戳

    • 存储当前时间距离2010年某一天的差值
  • 接下来10位:workerId

  • 最后12位:每一毫秒内的序列号

每到新的毫秒时,每一毫秒内的序列号不是从0开始,而是从100以内的一个随机数开始

为啥这么设计?试想如果每一秒都从0开始,在qps低的情况下,每一毫秒只产生1个id,那么最末尾永远是0。如果对ID取模分表,就会永远在第0号表,造成数据分布不均


怎么设置workerId

对于workerID的分配,当服务集群数量较小的情况下,完全可以手动配置。如果服务规模较大,动手配置成本太高。于是leaf用zookeeper 自动获取workerId,流程如下:

  1. 以自己的 ip:port为key,去zk建立持久顺序节点,以zk生成的 自增序号为workerId

    1. 创建的节点最后两级路径为:/forever/ip:port-序列号
  2. 如果zk中已经有自己ip:port的节点,就 复用 其workerId

    1. 怎么判断?拉取/forever下所有节点,每个节点的格式为ipport-序列号,判断每个节点中-前面的ipport是不是等于自己,如果等于取-后面的序列号作为workerId
    2. 只有leaf server会创建zk节点,因此节点数量可控
    3. 为啥可以复用?不会在同一时刻,有相同ip:port的两个实例,因此复用一定不会发生冲突

这种workerId分配策略能保证唯一性吗?能

  • 如果 ip:port 不同,在zk中一定是两个不同的序列号,因此不会冲突
  • 一个集群中不可能同时存在ip:port相同的两个机器

每个leaf server的ip:port最好手动指定,或者部署在ip不会变化的环境中

高可用:workerId会存到本地文件,这样遇到极端情况:leaf server服务重启,且zk也宕机时,也不影响使用
在这里插入图片描述


解决时钟回拨

雪花算法严格依赖时间,如果发生了时钟回拨,就可能导致ID重复,因此需要监测是否发生了时钟回拨并处理,在服务启动和运行时都会检测

服务启动时检测时间是否回退:

  • leaf server运行时,每隔3s会上自己的当前时间到zk节点中
  • 启动时,校验当前时间不能小于 zk中最近一次上报的时间

官方文档还提到如果是第一次启动,还会和其他leaf server校准时间。但源码中没找到这部分,应该是不需要做这个校准,已删除

运行时检测时间是否回退:

  • 全局维护了上次获取ID时的时间戳:lastTimestamp

  • 如果当前时间 now < lastTimestamp,说明发生了时钟回拨

    • 回拨了超过5ms,返回报错
    • 回拨了5ms内,sleep一会,直到赶上上次时间

如果zk宕机导致定时上报没有成功,同时又发生了时钟回拨,且leaf server宕机。此时leaf server启动时可能产生和之前重复的ID。因此需要做好监控告警,zk的高可用

如果3s内没上报,leaf server宕机了,然后时钟回退了2s,此时根据zk的时间检测不出来发生了时钟回退,也会造成ID重复。解决方法就是等一段时间才重启机器,保证等待的时间比回拨的时间长就行


源码走读

初始化:

public boolean init() {
        try {
            CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000);
            curator.start();
            Stat stat = curator.checkExists().forPath(PATH_FOREVER);
            if (stat == null) {
                //不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据
                zk_AddressNode = createNode(curator);
                //worker id 默认是0,存到本地文件
                updateLocalWorkerID(workerID);
                //每3s上报本机时间给forever节点
                ScheduledUploadData(curator, zk_AddressNode);
                return true;
            } else {
                Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001
                Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001)
                //存在根节点,先检查是否有属于自己的节点
                List<String> keys = curator.getChildren().forPath(PATH_FOREVER);
                for (String key : keys) {
                    String[] nodeKey = key.split("-");
                    realNode.put(nodeKey[0], key);
                    nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));
                }
                Integer workerid = nodeMap.get(listenAddress);
                if (workerid != null) {
                    //有自己的节点,zk_AddressNode=ip:port
                    zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress);
                    workerID = workerid;//启动worder时使用会使用
                    // 当前时间不能小于 zk中最近一次上报的时间
                    if (!checkInitTimeStamp(curator, zk_AddressNode)) {
                        throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
                    }
                    // 每3s上报时间
                    doService(curator);
                    // 将workerId写到本地
                    updateLocalWorkerID(workerID);
                    
                } else {
                    //新启动的节点,创建持久节点 ,不用check时间
                    String newNode = createNode(curator);
                    zk_AddressNode = newNode;
                    String[] nodeKey = newNode.split("-");
                    workerID = Integer.parseInt(nodeKey[1]);
                    doService(curator);
                    updateLocalWorkerID(workerID);
                    
                }
            }
        } catch (Exception e) {
           // zk不可用,从本地文件加载workerId
            try {
                Properties properties = new Properties();
                properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
                workerID = Integer.valueOf(properties.getProperty("workerID"));
               
            } catch (Exception e1) {
                return false;
            }
        }
        return true;
    }

获取ID:

public synchronized Result get(String key) {
        long timestamp = timeGen();

        // 发生了时钟回拨
        if (timestamp < lastTimestamp) {
            long offset = lastTimestamp - timestamp;
            // 回拨了5ms内,sleep一会
            if (offset <= 5) {
                try {
                    wait(offset << 1);
                    timestamp = timeGen();
                    if (timestamp < lastTimestamp) {
                        return new Result(-1, Status.EXCEPTION);
                    }
                } catch (InterruptedException e) {
                    return new Result(-2, Status.EXCEPTION);
                }
              // 回拨了超过5ms,返回报错
            } else {
                return new Result(-3, Status.EXCEPTION);
            }
        }

        // 和上次在同一毫秒
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                //seq 为0表示是当前ms已经超过4096个ID了
                // 需要sleep一会,下一毫秒时间开始对seq做随机
                sequence = RANDOM.nextInt(100);
                
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            //如果是新的ms, 对seq做随机
            sequence = RANDOM.nextInt(100);
        }
        lastTimestamp = timestamp;
        long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
        return new Result(id, Status.SUCCESS);

    }

上一篇:谐振式DCDC设计与参数计算 - 笔记汇聚


下一篇:人工智能理论之opencv图像预处理、数据库、GUI布局的综合应用(图像预处理版块)-总结