在雪花算法自定义解决时钟回拨问题一文中,对雪花算法的时钟回拨解决思路进行了说明,由于顺序号保存在内存中,每次启动都是从初始值开始,在特定场景下,比如停止服务后进行了时钟回拨,在理论上,还是可能出现序列号重复的情况。
这里将序列号持久化到本地磁盘文件中,这样下次启动时,首先会读取之前保存的持久化文件,获取序列号,而不是直接从固定值(比如1)开始,这样就算回拨了时间,只要顺序号还是持续增加的,就不会出现序列号相同的情况。
package com.demo.server.config;
import cn.hutool.core.net.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
public class CustomSnowFlake {
private static Logger log = LoggerFactory.getLogger(CustomSnowFlake.class);
// 2020-01-01 00:00:00 对应的秒
private final static long beginTs = 1577808000L;
// 顺序号最大值(大约419万)
private final static long maxSequence = 4194300L;
// 最大时钟回拨(秒),4194秒,大约69分钟
private final static long maxTimeback = 4194L;
private long lastTs = 0L;
private long processId;
private int processIdBits = 10;
private long sequence = 1L;
private int sequenceBits = 22;
public CustomSnowFlake() throws IOException {
String ipAddr = NetUtil.getLocalhostStr();
log.info("当前机器的ipAddr:" + ipAddr);
Long workerId = NetUtil.ipv4ToLong(ipAddr);
workerId = workerId % 1024;
log.info("当前机器的workId:" + workerId);
this.processId = workerId;
}
/**
* 根据已知的workerId和流水号进行初始化
* @param workerId
* @param sequence
*/
public CustomSnowFlake(Long workerId, Long sequence) {
this.processId = workerId;
this.sequence = sequence;
}
/**
*
* @param processId
*/
public CustomSnowFlake(long processId) {
if (processId > ((1 << processIdBits) - 1)) {
throw new RuntimeException("进程ID超出范围,设置位数" + processIdBits + ",最大" + ((1 << processIdBits) - 1));
}
this.processId = processId;
}
/**
* 文件不存在时,不需要报错
*/
public void restoreCheckPoint() {
String fileName = "snowflake.properties";
ClassPathResource resource = new ClassPathResource(fileName);
try {
String confPath = resource.getURI().getPath();
File confFile = new File(confPath);
InputStream inputStream = new FileInputStream(confFile);
byte buf[] = new byte[(int) confFile.length()];
int read = inputStream.read(buf);
inputStream.close();
String confInfo = new String(buf);
if (confInfo.startsWith("sequence=")) {
String splits[] = confInfo.split("=");
String sequence = splits[1];
this.sequence = Long.parseLong(sequence);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 文件不存在时,不影响正常使用
* 只是不能持久化顺序号
*/
public void saveCheckPoint() {
String fileName = "snowflake.properties";
ClassPathResource resource = new ClassPathResource(fileName);
try {
String confPath = resource.getURI().getPath();
System.out.println("confPath = " + confPath);
File confFile = new File(confPath);
FileOutputStream outputStream = new FileOutputStream(confFile);
String configInfo = "sequence=" + sequence;
outputStream.write(configInfo.getBytes(StandardCharsets.UTF_8));
outputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取当前时间(秒为单位)
* @return
*/
protected long timeGen() {
return System.currentTimeMillis() / 1000;
}
/**
* 生成一个Id
* @return
*/
public synchronized long nextId() {
// 获取当前时间(秒为单位)
long ts = timeGen();
// 刚刚生成的时间戳比上次的时间戳还小,出错
long tempDiff = lastTs - ts;
if (tempDiff >= maxTimeback) {
log.warn("时钟回拨超过4194秒,存在Id重复风险");
}
sequence = sequence + 1;
if(sequence >= maxSequence)
{
sequence = 1;
}
// 更新lastTs时间戳
lastTs = ts;
long timeDiff = ts - beginTs;
return (timeDiff << (processIdBits + sequenceBits)) | (processId << sequenceBits) | sequence;
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
CustomSnowFlake ig = new CustomSnowFlake();
ig.restoreCheckPoint();
for (int i = 0; i < 10; i++) {
System.out.println(ig.nextId());
ig.saveCheckPoint();
Thread.sleep(1000);
}
}
}
根据情况,还可以进行一定的优化,比如不一定每次生成id都进行持久化,而是当生成id次数达到一定数量或者间隔一定时间(比如3秒或5秒)以后,再进行持久化,这样就可以提高效率。当然这样做的结果,就可能有部分序列号没有成功保存的情况。因为序列号中的时间因子是每秒编号,所有程序重新启动后,往往时间就和之前的不相同了。
对于并发量小的情况,每次生成id都进行持久化,对性能和效率的影响也不大,对并发量大的情况,就需要考虑采用间隔一定数量或时间的方式来进行优化了。