本文首要是对美团的分布式ID结构Leaf的原理进行介绍,针对Leaf原项目中的一些issue,对Leaf项目进行功用增强,问题修正及优化改善,改善后的项目地址在这里:
Leaf项目改善计划 https://github.com/NotFound9/Leaf
Leaf原理剖析
Snowflake生成ID的形式
7849276-4d1955394baa3c6d.png
snowflake算法关于ID的位数是上图这样分配的:
1位的符号位+41位时刻戳+10位workID+12位序列号
加起来一共是64个二进制位,正好与Java中的long类型的位数相同。
美团的Leaf结构关于snowflake算法进行了一些位数调整,位数分配是这样:
最大41位时刻差+10位的workID+12位序列化
尽管看美团对Leaf的介绍文章里面说
Leaf-snowflake计划完全沿用snowflake计划的bit位设计,即是“1+41+10+12”的方式组装ID号。
其实看代码里面是没有专门设置符号位的,假如timestamp过大,导致时刻差占用42个二进制位,时刻差的第一位为1时,或许生成的id转换为十进制后会是负数:
//timestampLeftShift是22,workerIdShift是12 long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
时刻差是什么?
由于时刻戳是以1970年01月01日00时00分00秒作为开始点,其实咱们一般取的时刻戳其实是开始点到现在的时刻差,假如咱们能确认咱们取的时刻都是某个时刻点以后的时刻,那么可以将时刻戳的开始点改成这个时刻点,Leaf项目中,假如不设置开始时刻,默许是2010年11月4日09:42:54,这样可以使得支撑的最大时刻增长,Leaf结构的支撑最大时刻是开始点之后的69年。
workID怎样分配?
Leaf运用Zookeeper作为注册中心,每次机器发动时去Zookeeper特定途径/forever/下读取子节点列表,每个子节点存储了IP:Port及对应的workId,遍历子节点列表,假如存在当时IP:Port对应的workId,就运用节点信息中存储的workId,不存在就创立一个永久有序节点,将序号作为workId,并且将workId信息写入本地缓存文件workerID.properties,供发动时衔接Zookeeper失利,读取运用。由于workId只分配了10个二进制位,所以取值范围是0-1023。
序列号怎样生成?
序列号是12个二进制位,取值范围是0到4095,首要保证同一个leaf服务在同一毫秒内,生成的ID的唯一性。
序列号是生成流程如下:
1.当时时刻戳与上一个ID的时刻戳在同一毫秒内,那么对sequence+1,假如sequence+1超越了4095,那么进行等待,等到下一毫秒到了之后再生成ID。
2.当时时刻戳与上一个ID的时刻戳不在同一毫秒内,取一个100以内的随机数作为序列号。
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask; if (sequence == 0) { //seq 为0的时分表示是下一毫秒时刻开始对seq做随机 sequence = RANDOM.nextInt(100);
timestamp = tilNextMillis(lastTimestamp);
}
} else { //假如是新的ms开始 sequence = RANDOM.nextInt(100);
}
lastTimestamp = timestamp;
segment生成ID的形式
5e4ff128.png
这种形式需求依靠MySQL,表字段biz_tag代表事务名,max_id代表该事务现在已分配的最大ID值,step代表每次Leaf往数据库恳求时,一次性分配的ID数量。
大致流程便是每个Leaf服务在内存中有两个Segment实例,每个Segement保存一个分段的ID,
一个Segment是当时用于分配ID,有一个value特点保存这个分段已分配的最大ID,以及一个max特点这个分段最大的ID。
别的一个Segement是备用的,当一个Segement用完时,会进行切换,运用另一个Segement进行运用。
当一个Segement的分段ID运用率到达10%时,就会触发另一个Segement去DB获取分段ID,初始化好分段ID供之后运用。
Segment { private AtomicLong value = new AtomicLong(0); private volatile long max; private volatile int step;
}
SegmentBuffer { private String key; private Segment[] segments; //双buffer private volatile int currentPos; //当时的运用的segment的index private volatile boolean nextReady; //下一个segment是否处于可切换状况 private volatile boolean initOk; //是否初始化完成 private final AtomicBoolean threadRunning; //线程是否在运行中 private final ReadWriteLock lock; private volatile int step; private volatile int minStep; private volatile long updateTimestamp;
}
Leaf项目改善
现在Leaf项目存在的问题是
Snowflake生成ID相关:
1.注册中心只支撑Zookeeper
而关于一些小公司或者项目组,其他事务没有运用到Zookeeper的话,为了布置Leaf服务而保护一个Zookeeper集群的代价太大。所以原项目中有issue在问”怎样支撑非Zookeeper的注册中心“,由于一般项目中运用MySQL的概率会大很多,所以增加了运用MySQL作为注册中心,本地装备作为注册中心的功用。
2.潜在的时钟回拨问题
由于发动前,服务器时刻调到了曾经的时刻或者进行了回拨,衔接Zookeeper失利时会运用本地缓存文件workerID.properties中的workerId,而没有校验该ID生成的最大时刻戳,或许会形成ID重复,对这个问题进行了修正。
3.时刻差过大时,生成id为负数
由于缺少对时刻差的校验,当时刻差过大,转换为二进制数后超越41位后,在生成ID时会形成溢出,使得符号位为1,生成id为负数。
Segement生成ID相关:
没有太多问题,首要是依据一些issue对代码进行了功能优化。
详细改善如下:
Snowflake生成ID相关的改善:
1.针对Leaf原项目中的issue#84,增加zk_recycle形式(注册中心为zk,workId循环运用)
2.针对Leaf原项目中的issue#100,增加MySQL形式(注册中心为MySQL)
3.针对Leaf原项目中的issue#100,增加Local形式(注册中心为本地项目装备)
4.针对Leaf原项目中的issue#84,修正发动时时钟回拨的问题
5.针对Leaf原项目中的issue#106,修正时刻差过大,超越41位溢出,导致生成的id负数的问题
Segement生成ID相关的改善:
1.针对Leaf原项目中的issue#68,优化SegmentIDGenImpl.updateCacheFromDb()办法。
2.针对Leaf原项目中的 issue#88,运用位运算&替交换模运算
snowflake算法生成ID的相关改善
Leaf项目本来的注册中心的形式(咱们暂时指令为zk_normal形式)
运用Zookeeper作为注册中心,每次机器发动时去Zookeeper特定途径下读取子节点列表,假如存在当时IP:Port对应的workId,就运用节点信息中存储的workId,不存在就创立一个永久有序节点,将序号作为workId,并且将workId信息写入本地缓存文件workerID.properties,供发动时衔接Zookeeper失利,读取运用。
1.针对Leaf原项目中的issue#84,增加zk_recycle形式(注册中心为zk,workId循环运用)
问题概况:
issue#84:workid是否支撑收回?
SnowflakeService形式中,workid是否支撑收回?分布式环境下,每次重新布置或许就换了一个ip,假如没有收回的话1024个机器标识很快就会消耗完,为什么zk不用暂时节点去存储呢,这样能动态感知服务上下线,对workid进行管理收回?
解决计划:
开发了zk_recycle形式,针对运用snowflake生成分布式ID的技术计划,原本是运用Zookeeper作为注册中心为每个服务依据IP:Port分配一个固定的workId,workId生成范围为0到1023,workId不支撑收回,所以在Leaf的原项目中有人提出了一个issue#84 workid是否支撑收回?,由于当布置Leaf的服务的IP和Port不固守时,假如workId不支撑收回,当workId超越最大值时,会导致生成的分布式ID的重复。所以增加了workId循环运用的形式zk_recycle。
怎么运用zk_recycle形式?
在Leaf/leaf-server/src/main/resources/leaf.properties中增加以下装备
//敞开snowflake服务 leaf.snowflake.enable=true //leaf服务的端口,用于生成workId leaf.snowflake.port= //将snowflake形式设置为zk_recycle,此刻注册中心为Zookeeper,并且workerId可复用 leaf.snowflake.mode=zk_recycle //zookeeper的地址 leaf.snowflake.zk.address=localhost:2181
发动LeafServerApplication,调用/api/snowflake/get/test就可以取得此种形式下生成的分布式ID。
curl domain/api/snowflake/get/test 1256557484213448722
zk_recycle形式完成原理
按照上面的装备在leaf.properties里面进行装备后,
if(mode.equals(SnowflakeMode.ZK_RECYCLE)) {//注册中心为zk,对ip:port分配的workId是课循环利用的形式 String zkAddress = properties.getProperty(Constants.LEAF_SNOWFLAKE_ZK_ADDRESS);
RecyclableZookeeperHolder holder = new RecyclableZookeeperHolder(Utils.getIp(),port,zkAddress);
idGen = new SnowflakeIDGenImpl(holder); if (idGen.init()) {
logger.info("Snowflake Service Init Successfully in mode " + mode);
} else { throw new InitException("Snowflake Service Init Fail");
}
}
此刻SnowflakeIDGenImpl运用的holder是RecyclableZookeeperHolder的实例,workId是可循环利用的,RecyclableZookeeperHolder作业流程如下:
1.首先会在未运用的workId池(zookeeper途径为/snowflake/leaf.name/recycle/notuse/)中生成一切workId。
2.然后每次服务器发动时都是去未运用的workId池取一个新的workId,然后放到正在运用的workId池(zookeeper途径为/snowflake/leaf.name/recycle/inuse/)下,将此workId用于Id生成,并且守时上报时刻戳,更新zookeeper中的节点信息。
3.并且守时检测正在运用的workId池,发现某个workId超越最大时刻没有更新时刻戳的workId,会把它从正在运用的workId池移出,然后放到未运用的workId池中,以供workId循环运用。
4.并且正在运用这个很长时刻没有更新时刻戳的workId的服务器,在发现自己超越最大时刻,还没有上报时刻戳成功后,会停止id生成服务,以防workId被其他服务器循环运用,导致id重复。
2.针对Leaf原项目中的issue#100,增加MySQL形式(注册中心为MySQL)
问题概况:
issue#100:怎么运用非zk的注册中心?
解决计划:
开发了mysql形式,这种形式注册中心为MySQL,针对每个ip:port的workid是固定的。
怎么运用这种mysql形式?
需求先在数据库履行项目中的leaf_workerid_alloc.sql,完成建表,然后在Leaf/leaf-server/src/main/resources/leaf.properties中增加以下装备
//敞开snowflake服务 leaf.snowflake.enable=true //leaf服务的端口,用于生成workId leaf.snowflake.port= //将snowflake形式设置为mysql,此刻注册中心为Zookeeper,workerId为固定分配 leaf.snowflake.mode=mysql //mysql数据库地址 leaf.jdbc.url=
leaf.jdbc.username=
leaf.jdbc.password=
发动LeafServerApplication,调用/api/snowflake/get/test就可以取得此种形式下生成的分布式ID。
curl domain/api/snowflake/get/test 1256557484213448722
完成原理
运用上面的装备后,此刻SnowflakeIDGenImpl运用的holder是SnowflakeMySQLHolder的实例。完成原理与Leaf原项目默许的形式,运用Zookeeper作为注册中心,每个ip:port的workid是固定的完成原理相似,只是注册,获取workid,及更新时刻戳是与MySQL进行交互,而不是Zookeeper。
if (mode.equals(SnowflakeMode.MYSQL)) {//注册中心为mysql DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));
dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));
dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));
dataSource.init(); // Config Dao WorkerIdAllocDao dao = new WorkerIdAllocDaoImpl(dataSource);
SnowflakeMySQLHolder holder = new SnowflakeMySQLHolder(Utils.getIp(), port, dao);
idGen = new SnowflakeIDGenImpl(holder); if (idGen.init()) {
logger.info("Snowflake Service Init Successfully in mode " + mode);
} else { throw new InitException("Snowflake Service Init Fail");
}
}
3.针对Leaf原项目中的issue#100,增加Local形式(注册中心为本地项目装备)
问题概况:
issue#100:怎么运用非zk的注册中心?
解决计划:
开发了local形式,这种形式便是适用于布置Leaf服务的IP和Port根本不会变化的状况,便是在Leaf项目中的装备文件leaf.properties中显式得装备某某IP:某某Port对应哪个workId,每次布置新机器时,将IP:Port的时分在项目中增加这个装备,然后发动时项目会去读取leaf.properties中的装备,读取完写入本地缓存文件workId.json,下次发动时直接读取workId.json,最大时刻戳也每次同步到机器上的缓存文件workId.json中。
怎么运用这种local形式?
在Leaf/leaf-server/src/main/resources/leaf.properties中增加以下装备
//敞开snowflake服务
leaf.snowflake.enable=true
//leaf服务的端口,用于生成workId
leaf.snowflake.port= #注册中心为local的的形式 #leaf.snowflake.mode=local #leaf.snowflake.local.workIdMap= #workIdMap的格式是这样的{"Leaf服务的ip:端口":"固定的workId"},例如:{"10.1.46.33:8080":1,"10.1.46.33:8081":2}
发动LeafServerApplication,调用/api/snowflake/get/test就可以取得此种形式下生成的分布式ID。
curl domain/api/snowflake/get/test 1256557484213448722
4.针对Leaf原项目中的issue#84,修正发动时时钟回拨的问题
问题概况:
issue#84:由于当运用默许的形式(咱们暂时指令为zk_normal形式),注册中心为Zookeeper,workId不行复用,上面介绍了这种形式的作业流程,当Leaf服务发动时,衔接Zookeeper失利,那么会去本机缓存中读取workerID.properties文件,读取workId进行运用,可是由于workerID.properties中只存了workId信息,没有存储前次上报的最大时刻戳,所以没有进行时刻戳判别,所以假如机器的当时时刻被修改到之前,就或许会导致生成的ID重复。
解决计划:
所以增加了更新时刻戳到本地缓存的机制,每次在上报时刻戳时将时刻戳同时写入本机缓存workerID.properties,并且当运用本地缓存workerID.properties中的workId时,对时刻戳进行校验,当时体系时刻戳<缓存中的时刻戳时,才运用这个workerId。
//衔接失利,运用本地workerID.properties中的workerID,并且对时刻戳进行校验。 try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
Long maxTimestamp = Long.valueOf(properties.getProperty("maxTimestamp")); if (maxTimestamp!=null && System.currentTimeMillis() <maxtimestamp) {="" <span="" class="hljs-keyword">throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
}
workerID = Integer.valueOf(properties.getProperty("workerID"));
LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception e1) {
LOGGER.error("Read file error ", e1); return false;
} //守时使命每3s履行一次updateNewData()办法,调用更新updateLocalWorkerID()更新缓存文件workerID.properties void updateNewData(CuratorFramework curator, String path) { try { if (System.currentTimeMillis() < lastUpdateTime) { return;
}
curator.setData().forPath(path, buildData().getBytes());
updateLocalWorkerID(workerID);
lastUpdateTime = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.info("update init data error path is {} error is {}", path, e);
}
}
5.针对Leaf原项目中的issue#106,修正时刻差过大,超越41位溢出,导致生成的id负数的问题
问题概况:
由于Leaf结构是沿用snowflake的位数分配
最大41位时刻差+10位的workID+12位序列化,可是由于snowflake是强制要求第一位为符号位0,不然生成的id转换为十进制后会是复试,可是Leaf项目中没有对时刻差进行校验,当时刻戳过大或者自定义的twepoch设置不当过小,会导致核算得到的时刻差过大,转化为2进制后超越41位,且第一位为1,会导致生成的long类型的id为负数,例如当timestamp = twepoch+2199023255552L时,
此刻在生成id时,timestamp - twepoch会等于2199023255552,2199023255552转换为二进制后是1+41个0,此刻生成的id由于符号位是1,id会是负数-9223372036854775793
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
解决计划:
//一开始将最大的maxTimeStamp核算好 this.maxTimeStamp = ~(-1L << timeStampBits) + twepoch; //然后生成ID时进行校验 if (timestamp>maxTimeStamp) { throw new OverMaxTimeStampException("current timestamp is over maxTimeStamp, the generate id will be negative");
}
针对Segement生成分布式ID相关的改善
1.针对Leaf原项目中的issue#68,优化SegmentIDGenImpl.updateCacheFromDb()办法
针对issue#68里面的优化计划,对Segement Buffer的缓存数据与DB数据同步的作业流程进行了进一步优化,首要是对
对SegmentIDGenImpl.updateCacheFromDb()办法进行了优化。
原计划作业流程:
1.遍历cacheTags,将dbTags的副本insertTagsSet中存在的元素移除,使得insertTagsSet只要db新增的tag
2.遍历insertTagsSet,将这些新增的元素增加到cache中
3.遍历dbTags,将cacheTags的副本removeTagsSet中存在的元素移除,使得removeTagsSet只要cache中过期的tag
4.遍历removeTagsSet,将过期的元素移除cache
这种计划需求经历四次循环,运用两个HashSet别离存储db中新增的tag,cache中过期的tag,
并且为了筛选出新增的tag,过期的tag,对每个现在运用的tag有两次删去操作,
原有计划代码如下:
ListdbTags = dao.getAllTags(); if (dbTags == null || dbTags.isEmpty()) { return;
}
ListcacheTags = new ArrayList(cache.keySet());
SetinsertTagsSet = new HashSet<>(dbTags);
SetremoveTagsSet = new HashSet<>(cacheTags); //db中新加的tags灌进cache for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i); if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
} for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
} //cache中已失效的tags从cache删去 for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i); if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
} for (String tag : removeTagsSet) {
cache.remove(tag);
logger.info("Remove tag {} from IdCache", tag);
}
实际上咱们并不需求这些中间过程,现计划作业流程:
只需求遍历dbTags,判别cache中是否存在这个key,不存在便是新增元素,进行新增。
遍历cacheTags,判别dbSet中是否存在这个key,不存在便是过期元素,进行删去。
现有计划代码:
ListdbTags = dao.getAllTags(); if (dbTags == null || dbTags.isEmpty()) { return;
} //将dbTags中新加的tag增加cache,经过遍历dbTags,判别是否在cache中存在,不存在就增加到cache for (String dbTag : dbTags) { if (cache.containsKey(dbTag)==false) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(dbTag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(dbTag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", dbTag, buffer);
}
}
ListcacheTags = new ArrayList(cache.keySet());
SetdbTagSet = new HashSet<>(dbTags); //将cache中已失效的tag从cache删去,经过遍历cacheTags,判别是否在dbTagSet中存在,不存在说明过期,直接删去 for (String cacheTag : cacheTags) { if (dbTagSet.contains(cacheTag) == false) {
cache.remove(cacheTag);
logger.info("Remove tag {} from IdCache", cacheTag);
}
}
两个计划比照:
- 空间复杂度
比较原计划需求运用两个HashSet,这种计划的只需求运用一个hashSet,空间复杂度会低一些。 - 时刻复杂度
总遍历次数会比本来的少,时刻复杂度更低,由于判别是新增,过期的状况就直接处理了,不需求后续再独自遍历,
并且不需求对cache和dbtag的交集进行删去操作,由于本来计划为了取得新增的元素,是将dbSet的副本中现有元素进行删去得到。 - 代码可读性
原计划是4个for循环,一共35行代码,现计划是2个for循环,一共25行代码,更加简洁易懂。
2.针对Leaf原项目中的issue#88,运用位运算&替交换模运算
这个更新是针对这个issue#88 提出的问题,运用位运算&来代替取模运算%,履行效率更高。
原代码:
public int nextPos() { return (currentPos + 1) % 2;
}
现代码:
public int nextPos() { return (currentPos + 1) & 1;
}
本文来自:https://www.jmwww.net/a/13201.html 转载必须保留