具体实现:
package com.qiyu.tech.id.builder.service.impl; import com.qiyu.tech.id.builder.bean.IdBuilderPO; import com.qiyu.tech.id.builder.bean.LocalSeqId; import com.qiyu.tech.id.builder.dao.IdBuilderMapper; import com.qiyu.tech.id.builder.service.IdBuilderService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomUtils; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.management.RuntimeErrorException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import static com.qiyu.tech.id.builder.constants.IdTypeConstants.*; /** * @Author idea * @Date created in 11:18 下午 2020/12/17 */ @Service @Slf4j public class IdBuilderServiceImpl implements IdBuilderService, InitializingBean { private static ConcurrentHashMap<Integer, BitSet> bitSetMap = new ConcurrentHashMap<>(); private static Map<Integer, IdBuilderPO> idBuilderNotSeqMap; private static Map<Integer, IdBuilderPO> idBuilderSeqMap; private static Map<Integer, LocalSeqId> localSeqMap; private static Map<Integer, Boolean> newBuilderMap; private final static Object monitor = new Object(); @Resource private IdBuilderMapper idBuilderMapper; private int idBuilderIndex; @Override public Long unionId(int code) { //考虑到锁升级问题,在高并发场景下使用synchronized要比cas更佳 synchronized (monitor) { IdBuilderPO idBuilderPO = idBuilderNotSeqMap.get(code); if (idBuilderPO == null) { return null; } boolean isNew = newBuilderMap.get(code); if (isNew) { //预防出现id生成器网络中断问题 IdBuilderPO newIdBuilderPO = this.refreshIdBuilderConfig(idBuilderPO); if (newIdBuilderPO == null) { log.error("[unionId] refreshIdBuilderConfig出现异常"); return null; } idBuilderPO.setCurrentThreshold(newIdBuilderPO.getCurrentThreshold()); newBuilderMap.put(code, false); } long initNum = idBuilderPO.getCurrentThreshold(); int step = idBuilderPO.getStep(); int randomIndex = RandomUtils.nextInt((int) initNum, (int) initNum + step); BitSet bitSet = bitSetMap.get(code); if (bitSet == null) { bitSet = new BitSet(); bitSetMap.put(code, bitSet); } Long id; int countTime = 0; while (true) { boolean indexExist = bitSet.get(randomIndex); countTime++; if (!indexExist) { bitSet.set(randomIndex); id = Long.valueOf(randomIndex); break; } //如果重试次数大于了空间的0.75则需要重新获取新的id区间 测试之后得出 循环一千万次随机函数,16gb内存条件下,大约耗时在124ms左右 if (countTime >= step * 0.75) { //扩容需要修改表配置 IdBuilderPO newIdBuilderPO = this.updateIdBuilderConfig(idBuilderPO); if (newIdBuilderPO == null) { log.error("重试超过100次没有更新自增id配置成功"); return null; } initNum = newIdBuilderPO.getCurrentThreshold(); step = newIdBuilderPO.getStep(); idBuilderPO.setCurrentThreshold(initNum); bitSet.clear(); log.info("[unionId] 扩容IdBuilder,new idBuilderPO is {}",idBuilderPO); } randomIndex = RandomUtils.nextInt((int) initNum, (int) initNum + step); } return id; } } @Override public Long unionSeqId(int code) { synchronized (monitor) { LocalSeqId localSeqId = localSeqMap.get(code); IdBuilderPO idBuilderPO = idBuilderSeqMap.get(code); if (idBuilderPO == null || localSeqId == null) { log.error("[unionSeqId] code 参数有误,code is {}", code); return null; } boolean isNew = newBuilderMap.get(code); long result = localSeqId.getCurrentId(); localSeqId.setCurrentId(result + 1); if (isNew) { //预防出现id生成器网络中断问题 IdBuilderPO updateIdBuilderPO = this.refreshIdBuilderConfig(idBuilderPO); if (updateIdBuilderPO == null) { log.error("[unionSeqId] refreshIdBuilderConfig出现异常"); return null; } newBuilderMap.put(code, false); localSeqId.setCurrentId(updateIdBuilderPO.getCurrentThreshold()); localSeqId.setNextUpdateId(updateIdBuilderPO.getCurrentThreshold() + updateIdBuilderPO.getStep()); } //需要更新本地步长 if (localSeqId.getCurrentId() >= localSeqId.getNextUpdateId()) { IdBuilderPO newIdBuilderPO = this.updateIdBuilderConfig(idBuilderPO); if (newIdBuilderPO == null) { log.error("[unionSeqId] updateIdBuilderConfig出现异常"); return null; } idBuilderPO.setCurrentThreshold(newIdBuilderPO.getCurrentThreshold()); localSeqId.setCurrentId(newIdBuilderPO.getCurrentThreshold()); localSeqId.setNextUpdateId(newIdBuilderPO.getCurrentThreshold() + newIdBuilderPO.getStep()); log.info("[unionSeqId] 扩容IdBuilder,new localSeqId is {}",localSeqId); } return result; } } /** * 刷新id生成器的配置 * * @param idBuilderPO */ private IdBuilderPO refreshIdBuilderConfig(IdBuilderPO idBuilderPO) { IdBuilderPO updateResult = this.updateIdBuilderConfig(idBuilderPO); if (updateResult == null) { log.error("更新数据库配置出现异常,idBuilderPO is {}", idBuilderPO); throw new RuntimeErrorException(new Error("更新数据库配置出现异常,idBuilderPO is " + idBuilderPO.toString())); } return updateResult; } /** * 考虑分布式环境下 多个请求同时更新同一行数据的情况 * * @param idBuilderPO * @return */ private IdBuilderPO updateIdBuilderConfig(IdBuilderPO idBuilderPO) { int updateResult = -1; //假设重试过程中出现网络异常,那么使用cas的时候必须要考虑退出情况 极限情况下更新100次 for (int i = 0; i < 100; i++) { IdBuilderPO newIdBuilderPO = idBuilderMapper.selectOneForUpdate(idBuilderPO.getId()); updateResult = idBuilderMapper.updateCurrentThreshold(newIdBuilderPO.getCurrentThreshold() + newIdBuilderPO.getStep(), newIdBuilderPO.getId(), newIdBuilderPO.getVersion()); if (updateResult > 0) { return newIdBuilderPO; } } return null; } @Override public String unionIdStr(int code) { long id = this.unionId(code); IdBuilderPO idBuilderPO = idBuilderNotSeqMap.get(code); return idBuilderPO.getBizCode() + idBuilderIndex + id; } @Override public String unionSeqIdStr(int code) { long id = this.unionSeqId(code); IdBuilderPO idBuilderPO = idBuilderSeqMap.get(code); return idBuilderPO.getBizCode() + idBuilderIndex + id; } @Override public void afterPropertiesSet() { List<IdBuilderPO> idBuilderPOS = idBuilderMapper.selectAll(); idBuilderNotSeqMap = new ConcurrentHashMap<>(idBuilderPOS.size()); newBuilderMap = new ConcurrentHashMap<>(idBuilderPOS.size()); idBuilderSeqMap = new ConcurrentHashMap<>(idBuilderPOS.size()); localSeqMap = new ConcurrentHashMap<>(0); //每次重启到时候,都需要将之前的上一个区间的id全部抛弃,使用新的步长区间 for (IdBuilderPO idBuilderPO : idBuilderPOS) { if (idBuilderPO.getIsSeq() == NEED_SEQ) { idBuilderSeqMap.put(idBuilderPO.getId(), idBuilderPO); LocalSeqId localSeqId = new LocalSeqId(); localSeqId.setNextUpdateId(idBuilderPO.getCurrentThreshold() + idBuilderPO.getStep()); localSeqId.setCurrentId(idBuilderPO.getCurrentThreshold()); localSeqMap.put(idBuilderPO.getId(), localSeqId); } else { idBuilderNotSeqMap.put(idBuilderPO.getId(), idBuilderPO); } newBuilderMap.put(idBuilderPO.getId(), true); } this.idBuilderIndex= Integer.parseInt(System.getProperty("idBuilder.index")); } }
数据库层面设计:
package com.qiyu.tech.id.builder.dao; import com.baomidou.mybatisplus.mapper.BaseMapper; import com.qiyu.tech.id.builder.bean.IdBuilderPO; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; import java.util.List; /** * @Author idea * @Date created in 10:17 上午 2020/12/17 */ @Mapper public interface IdBuilderMapper extends BaseMapper<IdBuilderPO> { @Select("select * from t_id_builder_config") List<IdBuilderPO> selectAll(); @Select("select * from t_id_builder_config where id=#{id} limit 1 for update") IdBuilderPO selectOneForUpdate(@Param("id") int id); @Update("UPDATE t_id_builder_config set current_threshold=#{currentThreshold},version=version+1 where id=#{id} and version=#{version}") Integer updateCurrentThreshold(@Param("currentThreshold") long currentThreshold,@Param("id") int id,@Param("version") int version); }
这里面我只贴出了部分核心代码,http和rpc访问部分其实大同小异,可以更具自己的需要进行额外定制。
下边我贴出关于controller部分的代码:
package com.qiyu.tech.id.builder.controller; import com.qiyu.tech.id.builder.service.IdBuilderService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @Author idea * @Date created in 4:27 下午 2020/12/17 */ @RestController @RequestMapping(value = "id-builder") public class IdBuilderController { @Resource private IdBuilderService idBuilderService; @GetMapping("increase-id") public Long increaseId(int code){ long result = idBuilderService.unionId(code); System.out.println(result); return result; } @GetMapping("increase-seq-id") public Long increaseSeqId(int code){ long result = idBuilderService.unionSeqId(code); System.out.println(result); return result; } @GetMapping("increase-seq-id-str") public String unionSeqIdStr(int code){ String result = idBuilderService.unionSeqIdStr(code); System.out.println(result); return result; } @GetMapping("increase-id-str") public String unionIdStr(int code){ String result = idBuilderService.unionIdStr(code); System.out.println(result); return result; } }
application.yml配置文件
mybatis-plus: configuration: map-underscore-to-camel-case: true server: port: 8082 tomcat: max-threads: 500 max-connections: 5000
注意需要结合实际机器配置nginx的并发线程数目和tomcat的并发访问参数。
启动类:
ps:这里面的db访问配置是采用了自己封装的一个db工具,其实本质和SpringBoot直接配置jdbc是一样的,可以忽略
package com.qiyu.tech.id.builder; import com.qiyu.datasource.annotation.AppDataSource; import com.qiyu.datasource.enums.DatasourceConfigEnum; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @Author idea * @Date created in 11:16 下午 2020/12/17 */ @SpringBootApplication(scanBasePackages = "com.qiyu.*") @AppDataSource(datasourceType = {DatasourceConfigEnum.PROD_DB},defaultType = DatasourceConfigEnum.PROD_DB) public class IdBuilderApplication { public static void main(String[] args) { SpringApplication.run(IdBuilderApplication.class,args); System.out.println("========== IdBuilderApplication started! ========="); } }
测试环节:
通过将服务打包部署在机器上边,同时运行多个服务,通过nginx配置负载均衡,请求到不通的机器上边。
下边是我自己进行压测的一些相关配置参数:
压测启动后,后台控制台会打印相关系列参数:
当我们需要扩增机器的时候,新加的机器不会对原有发号令机器的id产生影响,可以支持较好的扩容。
每次拉取的本地id段应该设计在多次较好?
这里我们先将本地id段简称为segment。
按照一些过往经验的参考,通常是希望id发号器能够经量减少对于MySQL的访问次数,同时也需要结合实际部门的运维能力进行把控。
假设说我们MySQL是采用了1主2从的方式搭建,当某一从节点挂了,切换新的从节点时候需要消耗大约1分钟时长,那么我们的segment至少需要设计为高峰期QPS * 60 * 1 * 4 ,期间考需要额外考虑一些其他因素,例如网络新的节点切换之后带来的一些网络抖动问题等等,这能够保证即使MySQL出现了故障,本地的segment也可以暂时支撑一段时间。
设计待完善点:
该系统的设计不足点在于,当本地id即将用光的时候需要进行数据库查询,因此这个关键点会拖慢系统的响应时长,所以这里可以采用异步更新配置拉取id的思路进行完善。也就是说当本地id列表剩余只有15%可以使用的时候,便可以进行开启一个异步线程去拉取id列表了。