来吧,自己动手撸一个分布式ID生成器组件(下)

具体实现:


package com.qiyu.tech.id.builder.service.impl;
import com.qiyu.tech.id.builder.bean.IdBuilderPO;
import com.qiyu.tech.id.builder.bean.LocalSeqId;
import com.qiyu.tech.id.builder.dao.IdBuilderMapper;
import com.qiyu.tech.id.builder.service.IdBuilderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.management.RuntimeErrorException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static com.qiyu.tech.id.builder.constants.IdTypeConstants.*;
/**
 * @Author idea
 * @Date created in 11:18 下午 2020/12/17
 */
@Service
@Slf4j
public class IdBuilderServiceImpl implements IdBuilderService, InitializingBean {
    private static ConcurrentHashMap<Integer, BitSet> bitSetMap = new ConcurrentHashMap<>();
    private static Map<Integer, IdBuilderPO> idBuilderNotSeqMap;
    private static Map<Integer, IdBuilderPO> idBuilderSeqMap;
    private static Map<Integer, LocalSeqId> localSeqMap;
    private static Map<Integer, Boolean> newBuilderMap;
    private final static Object monitor = new Object();
    @Resource
    private IdBuilderMapper idBuilderMapper;
  
    private int idBuilderIndex;
    @Override
    public Long unionId(int code) {
        //考虑到锁升级问题,在高并发场景下使用synchronized要比cas更佳
        synchronized (monitor) {
            IdBuilderPO idBuilderPO = idBuilderNotSeqMap.get(code);
            if (idBuilderPO == null) {
                return null;
            }
            boolean isNew = newBuilderMap.get(code);
            if (isNew) {
                //预防出现id生成器网络中断问题
                IdBuilderPO newIdBuilderPO = this.refreshIdBuilderConfig(idBuilderPO);
                if (newIdBuilderPO == null) {
                    log.error("[unionId] refreshIdBuilderConfig出现异常");
                    return null;
                }
                idBuilderPO.setCurrentThreshold(newIdBuilderPO.getCurrentThreshold());
                newBuilderMap.put(code, false);
            }
            long initNum = idBuilderPO.getCurrentThreshold();
            int step = idBuilderPO.getStep();
            int randomIndex = RandomUtils.nextInt((int) initNum, (int) initNum + step);
            BitSet bitSet = bitSetMap.get(code);
            if (bitSet == null) {
                bitSet = new BitSet();
                bitSetMap.put(code, bitSet);
            }
            Long id;
            int countTime = 0;
            while (true) {
                boolean indexExist = bitSet.get(randomIndex);
                countTime++;
                if (!indexExist) {
                    bitSet.set(randomIndex);
                    id = Long.valueOf(randomIndex);
                    break;
                }
                //如果重试次数大于了空间的0.75则需要重新获取新的id区间 测试之后得出 循环一千万次随机函数,16gb内存条件下,大约耗时在124ms左右
                if (countTime >= step * 0.75) {
                    //扩容需要修改表配置
                    IdBuilderPO newIdBuilderPO = this.updateIdBuilderConfig(idBuilderPO);
                    if (newIdBuilderPO == null) {
                        log.error("重试超过100次没有更新自增id配置成功");
                        return null;
                    }
                    initNum = newIdBuilderPO.getCurrentThreshold();
                    step = newIdBuilderPO.getStep();
                    idBuilderPO.setCurrentThreshold(initNum);
                    bitSet.clear();
                    log.info("[unionId] 扩容IdBuilder,new idBuilderPO is {}",idBuilderPO);
                }
                randomIndex = RandomUtils.nextInt((int) initNum, (int) initNum + step);
            }
            return id;
        }
    }
    @Override
    public Long unionSeqId(int code) {
        synchronized (monitor) {
            LocalSeqId localSeqId = localSeqMap.get(code);
            IdBuilderPO idBuilderPO = idBuilderSeqMap.get(code);
            if (idBuilderPO == null || localSeqId == null) {
                log.error("[unionSeqId] code 参数有误,code is {}", code);
                return null;
            }
            boolean isNew = newBuilderMap.get(code);
            long result = localSeqId.getCurrentId();
            localSeqId.setCurrentId(result + 1);
            if (isNew) {
                //预防出现id生成器网络中断问题
                IdBuilderPO updateIdBuilderPO = this.refreshIdBuilderConfig(idBuilderPO);
                if (updateIdBuilderPO == null) {
                    log.error("[unionSeqId] refreshIdBuilderConfig出现异常");
                    return null;
                }
                newBuilderMap.put(code, false);
                localSeqId.setCurrentId(updateIdBuilderPO.getCurrentThreshold());
                localSeqId.setNextUpdateId(updateIdBuilderPO.getCurrentThreshold() + updateIdBuilderPO.getStep());
            }
            //需要更新本地步长
            if (localSeqId.getCurrentId() >= localSeqId.getNextUpdateId()) {
                IdBuilderPO newIdBuilderPO = this.updateIdBuilderConfig(idBuilderPO);
                if (newIdBuilderPO == null) {
                    log.error("[unionSeqId] updateIdBuilderConfig出现异常");
                    return null;
                }
                idBuilderPO.setCurrentThreshold(newIdBuilderPO.getCurrentThreshold());
                localSeqId.setCurrentId(newIdBuilderPO.getCurrentThreshold());
                localSeqId.setNextUpdateId(newIdBuilderPO.getCurrentThreshold() + newIdBuilderPO.getStep());
                log.info("[unionSeqId] 扩容IdBuilder,new localSeqId is {}",localSeqId);
            }
            return result;
        }
    }
    /**
     * 刷新id生成器的配置
     *
     * @param idBuilderPO
     */
    private IdBuilderPO refreshIdBuilderConfig(IdBuilderPO idBuilderPO) {
        IdBuilderPO updateResult = this.updateIdBuilderConfig(idBuilderPO);
        if (updateResult == null) {
            log.error("更新数据库配置出现异常,idBuilderPO is {}", idBuilderPO);
            throw new RuntimeErrorException(new Error("更新数据库配置出现异常,idBuilderPO is " + idBuilderPO.toString()));
        }
        return updateResult;
    }
    /**
     * 考虑分布式环境下 多个请求同时更新同一行数据的情况
     *
     * @param idBuilderPO
     * @return
     */
    private IdBuilderPO updateIdBuilderConfig(IdBuilderPO idBuilderPO) {
        int updateResult = -1;
        //假设重试过程中出现网络异常,那么使用cas的时候必须要考虑退出情况 极限情况下更新100次
        for (int i = 0; i < 100; i++) {
            IdBuilderPO newIdBuilderPO = idBuilderMapper.selectOneForUpdate(idBuilderPO.getId());
            updateResult = idBuilderMapper.updateCurrentThreshold(newIdBuilderPO.getCurrentThreshold() + newIdBuilderPO.getStep(), newIdBuilderPO.getId(), newIdBuilderPO.getVersion());
            if (updateResult > 0) {
                return newIdBuilderPO;
            }
        }
        return null;
    }
    @Override
    public String unionIdStr(int code) {
        long id = this.unionId(code);
        IdBuilderPO idBuilderPO = idBuilderNotSeqMap.get(code);
        return idBuilderPO.getBizCode() + idBuilderIndex + id;
    }
    @Override
    public String unionSeqIdStr(int code) {
        long id = this.unionSeqId(code);
        IdBuilderPO idBuilderPO = idBuilderSeqMap.get(code);
        return idBuilderPO.getBizCode() + idBuilderIndex + id;
    }
    @Override
    public void afterPropertiesSet() {
        List<IdBuilderPO> idBuilderPOS = idBuilderMapper.selectAll();
        idBuilderNotSeqMap = new ConcurrentHashMap<>(idBuilderPOS.size());
        newBuilderMap = new ConcurrentHashMap<>(idBuilderPOS.size());
        idBuilderSeqMap = new ConcurrentHashMap<>(idBuilderPOS.size());
        localSeqMap = new ConcurrentHashMap<>(0);
        //每次重启到时候,都需要将之前的上一个区间的id全部抛弃,使用新的步长区间
        for (IdBuilderPO idBuilderPO : idBuilderPOS) {
            if (idBuilderPO.getIsSeq() == NEED_SEQ) {
                idBuilderSeqMap.put(idBuilderPO.getId(), idBuilderPO);
                LocalSeqId localSeqId = new LocalSeqId();
                localSeqId.setNextUpdateId(idBuilderPO.getCurrentThreshold() + idBuilderPO.getStep());
                localSeqId.setCurrentId(idBuilderPO.getCurrentThreshold());
                localSeqMap.put(idBuilderPO.getId(), localSeqId);
            } else {
                idBuilderNotSeqMap.put(idBuilderPO.getId(), idBuilderPO);
            }
            newBuilderMap.put(idBuilderPO.getId(), true);
        }
        this.idBuilderIndex= Integer.parseInt(System.getProperty("idBuilder.index"));
    }
}



数据库层面设计:


package com.qiyu.tech.id.builder.dao;
import com.baomidou.mybatisplus.mapper.BaseMapper;
import com.qiyu.tech.id.builder.bean.IdBuilderPO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import java.util.List;
/**
 * @Author idea
 * @Date created in 10:17 上午 2020/12/17
 */
@Mapper
public interface IdBuilderMapper extends BaseMapper<IdBuilderPO> {
    @Select("select * from t_id_builder_config")
    List<IdBuilderPO> selectAll();
    @Select("select * from t_id_builder_config where id=#{id} limit 1 for update")
    IdBuilderPO selectOneForUpdate(@Param("id") int id);
    @Update("UPDATE t_id_builder_config set current_threshold=#{currentThreshold},version=version+1 where id=#{id} and version=#{version}")
    Integer updateCurrentThreshold(@Param("currentThreshold") long currentThreshold,@Param("id") int id,@Param("version") int version);
}


这里面我只贴出了部分核心代码,http和rpc访问部分其实大同小异,可以更具自己的需要进行额外定制。


下边我贴出关于controller部分的代码:


package com.qiyu.tech.id.builder.controller;
import com.qiyu.tech.id.builder.service.IdBuilderService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * @Author idea
 * @Date created in 4:27 下午 2020/12/17
 */
@RestController
@RequestMapping(value = "id-builder")
public class IdBuilderController {
    @Resource
    private IdBuilderService idBuilderService;
  
    @GetMapping("increase-id")
    public Long increaseId(int code){
        long result = idBuilderService.unionId(code);
        System.out.println(result);
        return result;
    }
    @GetMapping("increase-seq-id")
    public Long increaseSeqId(int code){
        long result = idBuilderService.unionSeqId(code);
        System.out.println(result);
        return result;
    }
    @GetMapping("increase-seq-id-str")
    public String unionSeqIdStr(int code){
        String result = idBuilderService.unionSeqIdStr(code);
        System.out.println(result);
        return result;
    }
    @GetMapping("increase-id-str")
    public String unionIdStr(int code){
        String result = idBuilderService.unionIdStr(code);
        System.out.println(result);
        return result;
    }
}


application.yml配置文件


mybatis-plus:
  configuration:
    map-underscore-to-camel-case: true
server:
  port: 8082
  tomcat:
    max-threads: 500
    max-connections: 5000


注意需要结合实际机器配置nginx的并发线程数目和tomcat的并发访问参数。


启动类:


ps:这里面的db访问配置是采用了自己封装的一个db工具,其实本质和SpringBoot直接配置jdbc是一样的,可以忽略


package com.qiyu.tech.id.builder;
import com.qiyu.datasource.annotation.AppDataSource;
import com.qiyu.datasource.enums.DatasourceConfigEnum;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * @Author idea
 * @Date created in 11:16 下午 2020/12/17
 */
@SpringBootApplication(scanBasePackages = "com.qiyu.*")
@AppDataSource(datasourceType = {DatasourceConfigEnum.PROD_DB},defaultType = DatasourceConfigEnum.PROD_DB)
public class IdBuilderApplication {
    public static void main(String[] args) {
        SpringApplication.run(IdBuilderApplication.class,args);
        System.out.println("========== IdBuilderApplication started! =========");
    }
}


测试环节:


通过将服务打包部署在机器上边,同时运行多个服务,通过nginx配置负载均衡,请求到不通的机器上边。


下边是我自己进行压测的一些相关配置参数:


来吧,自己动手撸一个分布式ID生成器组件(下)


压测启动后,后台控制台会打印相关系列参数:


来吧,自己动手撸一个分布式ID生成器组件(下)


当我们需要扩增机器的时候,新加的机器不会对原有发号令机器的id产生影响,可以支持较好的扩容。


每次拉取的本地id段应该设计在多次较好?


这里我们先将本地id段简称为segment。


按照一些过往经验的参考,通常是希望id发号器能够经量减少对于MySQL的访问次数,同时也需要结合实际部门的运维能力进行把控。


假设说我们MySQL是采用了1主2从的方式搭建,当某一从节点挂了,切换新的从节点时候需要消耗大约1分钟时长,那么我们的segment至少需要设计为高峰期QPS * 60 * 1 * 4 ,期间考需要额外考虑一些其他因素,例如网络新的节点切换之后带来的一些网络抖动问题等等,这能够保证即使MySQL出现了故障,本地的segment也可以暂时支撑一段时间。


设计待完善点


该系统的设计不足点在于,当本地id即将用光的时候需要进行数据库查询,因此这个关键点会拖慢系统的响应时长,所以这里可以采用异步更新配置拉取id的思路进行完善。也就是说当本地id列表剩余只有15%可以使用的时候,便可以进行开启一个异步线程去拉取id列表了。


来吧,自己动手撸一个分布式ID生成器组件(下)


上一篇:IDC:安全性、价格和低复杂性是企业采用SD-WAN的主要动因


下一篇:如何用ABAP代码的方式在短时间内批量生成大量订单数据用于性能测试