号段模式的ID很接近严格递增,如果在订单场景,可以根据ID猜到一天的订单量。此时就可以用雪花算法模式
leaf在每一位的分配和标准snowflake一致:
-
最高位符号位为0
-
接下来41位:毫秒级时间戳
- 存储当前时间距离2010年某一天的差值
-
接下来10位:workerId
-
最后12位:每一毫秒内的序列号
每到新的毫秒时,每一毫秒内的序列号不是从0开始,而是从100以内的一个随机数开始
为啥这么设计?试想如果每一秒都从0开始,在qps低的情况下,每一毫秒只产生1个id,那么最末尾永远是0。如果对ID取模分表,就会永远在第0号表,造成数据分布不均
怎么设置workerId
对于workerID的分配,当服务集群数量较小的情况下,完全可以手动配置。如果服务规模较大,动手配置成本太高。于是leaf用zookeeper 自动获取workerId
,流程如下:
-
以自己的
ip:port
为key,去zk建立持久顺序节点,以zk生成的自增序号为workerId
- 创建的节点最后两级路径为:
/forever/ip:port-序列号
- 创建的节点最后两级路径为:
-
如果zk中已经有自己ip:port的节点,就
复用
其workerId- 怎么判断?拉取/forever下所有节点,每个节点的格式为
ipport-序列号
,判断每个节点中-
前面的ipport是不是等于自己,如果等于取-
后面的序列号作为workerId - 只有leaf server会创建zk节点,因此节点数量可控
- 为啥可以复用?不会在同一时刻,有相同ip:port的两个实例,因此复用一定不会发生冲突
- 怎么判断?拉取/forever下所有节点,每个节点的格式为
这种workerId分配策略能保证唯一性吗?能
- 如果
ip:port
不同,在zk中一定是两个不同的序列号,因此不会冲突 - 一个集群中不可能同时存在ip:port相同的两个机器
每个leaf server的ip:port最好手动指定,或者部署在ip不会变化的环境中
高可用:workerId会存到本地文件,这样遇到极端情况:leaf server服务重启,且zk也宕机时,也不影响使用
解决时钟回拨
雪花算法严格依赖时间,如果发生了时钟回拨,就可能导致ID重复,因此需要监测是否发生了时钟回拨并处理,在服务启动和运行时都会检测
在服务启动时检测时间是否回退:
- leaf server运行时,每隔3s会上自己的当前时间到zk节点中
- 启动时,校验当前时间不能小于 zk中最近一次上报的时间
官方文档还提到如果是第一次启动,还会和其他leaf server校准时间。但源码中没找到这部分,应该是不需要做这个校准,已删除
在运行时检测时间是否回退:
-
全局维护了上次获取ID时的时间戳:
lastTimestamp
-
如果当前时间
now < lastTimestamp
,说明发生了时钟回拨- 回拨了超过5ms,返回报错
- 回拨了5ms内,sleep一会,直到赶上上次时间
如果zk宕机导致定时上报没有成功,同时又发生了时钟回拨,且leaf server宕机。此时leaf server启动时可能产生和之前重复的ID。因此需要做好监控告警,zk的高可用
如果3s内没上报,leaf server宕机了,然后时钟回退了2s,此时根据zk的时间检测不出来发生了时钟回退,也会造成ID重复。解决方法就是等一段时间才重启机器,保证等待的时间比回拨的时间长就行
源码走读
初始化:
public boolean init() {
try {
CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000);
curator.start();
Stat stat = curator.checkExists().forPath(PATH_FOREVER);
if (stat == null) {
//不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据
zk_AddressNode = createNode(curator);
//worker id 默认是0,存到本地文件
updateLocalWorkerID(workerID);
//每3s上报本机时间给forever节点
ScheduledUploadData(curator, zk_AddressNode);
return true;
} else {
Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001
Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001)
//存在根节点,先检查是否有属于自己的节点
List<String> keys = curator.getChildren().forPath(PATH_FOREVER);
for (String key : keys) {
String[] nodeKey = key.split("-");
realNode.put(nodeKey[0], key);
nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));
}
Integer workerid = nodeMap.get(listenAddress);
if (workerid != null) {
//有自己的节点,zk_AddressNode=ip:port
zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress);
workerID = workerid;//启动worder时使用会使用
// 当前时间不能小于 zk中最近一次上报的时间
if (!checkInitTimeStamp(curator, zk_AddressNode)) {
throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
}
// 每3s上报时间
doService(curator);
// 将workerId写到本地
updateLocalWorkerID(workerID);
} else {
//新启动的节点,创建持久节点 ,不用check时间
String newNode = createNode(curator);
zk_AddressNode = newNode;
String[] nodeKey = newNode.split("-");
workerID = Integer.parseInt(nodeKey[1]);
doService(curator);
updateLocalWorkerID(workerID);
}
}
} catch (Exception e) {
// zk不可用,从本地文件加载workerId
try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
workerID = Integer.valueOf(properties.getProperty("workerID"));
} catch (Exception e1) {
return false;
}
}
return true;
}
获取ID:
public synchronized Result get(String key) {
long timestamp = timeGen();
// 发生了时钟回拨
if (timestamp < lastTimestamp) {
long offset = lastTimestamp - timestamp;
// 回拨了5ms内,sleep一会
if (offset <= 5) {
try {
wait(offset << 1);
timestamp = timeGen();
if (timestamp < lastTimestamp) {
return new Result(-1, Status.EXCEPTION);
}
} catch (InterruptedException e) {
return new Result(-2, Status.EXCEPTION);
}
// 回拨了超过5ms,返回报错
} else {
return new Result(-3, Status.EXCEPTION);
}
}
// 和上次在同一毫秒
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
//seq 为0表示是当前ms已经超过4096个ID了
// 需要sleep一会,下一毫秒时间开始对seq做随机
sequence = RANDOM.nextInt(100);
timestamp = tilNextMillis(lastTimestamp);
}
} else {
//如果是新的ms, 对seq做随机
sequence = RANDOM.nextInt(100);
}
lastTimestamp = timestamp;
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
return new Result(id, Status.SUCCESS);
}