分布式协调-Zookeeper(手写配置中心&动态刷新)

分布式协调-Zookeeper(手写配置中心&动态刷新)

前面我们分析了SpringBoot加载environment的源码, 并且也聊了Zookeeper的基本使用以及特性,  这里我想对他们两个进行一个结合,实现配置中心。因为前面我们在聊ShardingSphere使用它做了配置的自动更新,我想知道它是怎么做的。后面我就在它的特性中聊到了他的watcher机制。今天把这些混合一下,手写一个配置中心,配合zk以及SpringBoot中的自动装配以及它的environment对象解析过程实现。而且,现在随着微服务节点的增多,动态配置就显得比较重要了。 下面的代码分为两步

  • 手写配置中心
  • 动态刷新

手写配置中心

在SpringBoot加载配置文件的源码中聊到,它里面的所有配置文件都会加载到一个environment对象中。通过@Value和注入environment对象之后就可以获取相关属性值。

并且我们可以对environment进行扩展,我们可以实现【EnvironmentPostProcessor】接口,在environment对象加载前做一些事情,大概流程为:

  • 通过我们的文件名称加载使用流的形式加载文件,
  • 然后把文件包装成environment对象中存储的对象,
  • 然后把我们实现了这个接口的类给它进行自动装配。
分布式协调-Zookeeper(手写配置中心&动态刷新)
public class CustomEnvironmentPostProcessor implements EnvironmentPostProcessor {
    private final Properties properties=new Properties();
    //我们要加载的文件名称
    private String propertiesFile="custom.properties";

    @Override
    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
        //读取文件,并且变成一个resource对象
        Resource resource=new ClassPathResource(propertiesFile);

        //动态给它塞在environment中,在后续就可以拿到了
        environment.getPropertySources().addLast(loadProperties(resource));
    }

    //把文件以流的形式读取到propert中,并且包装成一个对象进行返回,这个对象是environment中需要的对象类型
    private PropertySource<?> loadProperties(Resource resource){
        if(!resource.exists()){
            throw new RuntimeException("file not exist");
        }
        try {
            //custom.properties
            properties.load(resource.getInputStream());
            return new PropertiesPropertySource(resource.getFilename(),properties);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
View Code

那既然这样,我们是不是就可以把这些配置文件中的属性,放在zk上呢,当项目启动的时候,我们自动加载这些属性,并且给它塞在environment中呢?

首先,我们在zk上写上我们的配置文件

分布式协调-Zookeeper(手写配置中心&动态刷新)

 

  • 编写一个类,这个类会在SpringBoot中的refresh(也就是容器初始前进行调用),因为这个类实现了【ApplicationContextInitializer】,并且对这个类进行自动装配,在这个类中对所有实现了下面的这个接口的实现类进行加载,并且获取他们返回的PropertySource对象,然后把这些放在environment中
    • //这个接口会在spring初始前面进行调用
      public class ZookeeperApplicationContextInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
          
          private final List<PropertySourceLocator> propertySourceLocators;
      
      
          //加载所有实现我们自定义加载配置文件的类
          public ZookeeperApplicationContextInitializer() {
              ClassLoader classLoader= ClassUtils.getDefaultClassLoader();
              propertySourceLocators=new ArrayList<>(SpringFactoriesLoader
                      .loadFactories(PropertySourceLocator.class,classLoader));
              System.out.println("加载所有的自定义配置类到一个list中");
          }
      
          @Override
          public void initialize(ConfigurableApplicationContext applicationContext) {
              //获取environment对象
              ConfigurableEnvironment environment=applicationContext.getEnvironment();
              //我们所有的property对存储在这个集合汇总
              MutablePropertySources mutablePropertySources=environment.getPropertySources();
              //循环所有我们自己加载配置文件的类
              for(PropertySourceLocator locator:this.propertySourceLocators){
                  //执行他们的默认方法,在默认方法中调用了他们的加载配置文件的方法,并且返回他们包装好的属性
                 Collection<PropertySource<?>> sources=locator.locateCollection(environment,applicationContext);
                 if(sources==null||sources.size()==0){
                     continue;
                 }
                 //循环把属性放再environment中
                 for (PropertySource<?> p:sources){
                     mutablePropertySources.addLast(p);
                 }
              }
          }
      }
  • 编写一个接口,所有实现了这个接口的类,都可以对他们想要交给environment对象的配置文件进行加载,并且包装成一个PropertySource集合进行返回。当然这个接口对应的实现类和接口本身都要进行自动装配,key是接口的全限定名,value是实现类的名称。
    • public interface PropertySourceLocator {
      
          // 对配置文件进行加载
          PropertySource<?> locate(Environment environment, ConfigurableApplicationContext applicationContext);
      
         default Collection<PropertySource<?>> locateCollection(Environment environment, ConfigurableApplicationContext applicationContext){
              return locateCollections(this,environment,applicationContext);
          }
      
          //收集属性源列表
          static Collection<PropertySource<?>> locateCollections(PropertySourceLocator locator,Environment environment, ConfigurableApplicationContext applicationContext) {
             // 外部会调用我们的locateCollection方法,locateCollection会调用当前方法,把加载到的配置文件包装成开一个放在environment中的对象
              PropertySource<?> propertySource=locator.locate(environment,applicationContext);
              return propertySource==null?Collections.emptyList():Collections.singletonList(propertySource);
          }
      
      }

实现了接口的zk配置文件获取类

    •   
      public class ZookeeperPropertySourceLocator implements PropertySourceLocator{
          private final CuratorFramework curatorFramework;
         //这里配置文件下的子节点 private final String DATA_NODE="/data"; // 连接zk public ZookeeperPropertySourceLocator() { curatorFramework= CuratorFrameworkFactory.builder() .connectString("192.168.43.3:2181") .sessionTimeoutMs(20000) .connectionTimeoutMs(20000) .retryPolicy(new ExponentialBackoffRetry(1000,3))
         //这里是保存我们配置文件的节点 .namespace("config") .build(); curatorFramework.start(); } // 加载配置文件 @Override public PropertySource<?> locate(Environment environment, ConfigurableApplicationContext applicationContext) { //加载远程Zookeeper的配置保存到一个PropertySource System.out.println("开始加载外部化配置"); //这里Spring中提供的一种PropertySource类型,因为在environment中放的都是这个类型 CompositePropertySource composite=new CompositePropertySource("configService"); try { //这里是我们从zk上获取的文件 Map<String,Object> dataMap=getRemoteEnvironment(); //给这个PropertySource起一个名称 MapPropertySource mapPropertySource=new MapPropertySource("configService",dataMap); composite.addPropertySource(mapPropertySource); } catch (Exception e) { e.printStackTrace(); } //并且返回 return composite; } // 从远程获取配置信息 private Map<String,Object> getRemoteEnvironment() throws Exception { //从data节点下面获取的配置信息 String data=new String (curatorFramework.getData().forPath(DATA_NODE)); //支持JSON格式 ObjectMapper objectMapper=new ObjectMapper(); return objectMapper.readValue(data,Map.class); } }

       

整体流程:,Spring容器初始化之前,会调到我们实现了它这个接口【ApplicationContextInitializer】中的initialize方法,这个方法中对实现了我们所有加载environment的类的locateCollection进行执行,并且把返回的environment所要的PropertySource塞入environment中,这样当我们获取某个属性的时候就能从environment中获取了。

测试:我们看源码发现Banner是在整体初始化前面打印的,而这句话是在banner前面打印的,也就是说在整体初始化前就执行了我类中的方法。

分布式协调-Zookeeper(手写配置中心&动态刷新)

 

分布式协调-Zookeeper(手写配置中心&动态刷新)

我们现在的配置文件中是没有数据的,但是我们现在通过@Value注解依然可以获取到数据,那就是说,他已经加载到了zk上的配置文件,但是当修改的时候还是没有动态刷新,下面我们对他进行动态刷新。

分布式协调-Zookeeper(手写配置中心&动态刷新)

动态刷新配置

流程粗粒度:

  • 整体使用watcher和Spring中的事件进行操作,我们使用watcher监控存储配置文件的节点,当节点变化通知我们,我们去发送一个Spring事件去通知我们的事件操作类(ConfigurationPropertiesRebinder),在里面对有@Value注解的类中的属性进行反射赋值。

流程细粒度:

  • 前面我们在初始获取zk上的数据的时候就注册一个事件(NodeDataChangeCuratorCacheListener),这个事件是监控zk上存储配置文件变化的事件,一旦变化,zk就会来调用我们NodeDataChangeCuratorCacheListener中的event方法,它也会把监控的节点的数据传递过来,就是我们新修改的配置文件。
  • 我们把获取到的新的配置文件变成map的形式,然后把environment中的存储zk上配置文件的value数值进行替换,这样就对配置文件进行了动态替换,然而这里并没有对bean中的属性进行重新赋值。
  • 那这个时候我们就去发送一个SpringBoot的事件,在SpringBoot收到事件后对我们存储所有@Value的属性的Map循环遍历,并且拿到environment中的内容反射赋值到这些属性中,这个时候我们就可以得到动态的数值了。
  • 这个存储拥有@Value的数值的map是在我们Spring中bean被加载后我们实现BeanPostProcessor中的postProcessBeforeInitialization方法进行收集的(我们写一个自定义注解,所有拥有我们自定义注解,并且有@Value的注解的类,我们都要进行扫描获取属性。)

【代码】

  • 在我们上面Spring加载前的 ZookeeperPropertySourceLocator 中的locat方法中注册一个事件
    • // 使用watcher机制,当节点变化的时候,zk会调用我们的事件监听类NodeDataChangeCuratorCacheListener并且执行里面的event方法
          // 然后这个event中去
          private void addListener(Environment environment, ConfigurableApplicationContext applicationContext){
              NodeDataChangeCuratorCacheListener ndc=new NodeDataChangeCuratorCacheListener(environment,applicationContext);
              CuratorCache curatorCache=CuratorCache.build(curatorFramework,DATA_NODE,CuratorCache.Options.SINGLE_NODE_CACHE);
              CuratorCacheListener listener=CuratorCacheListener
                      .builder()
                      .forChanges(ndc).build();
              curatorCache.listenable().addListener(listener);
              curatorCache.start();
          }
  • 事件中替换environment中的配置,并且注册一个事件同时Spring进行反射赋值
    • public class NodeDataChangeCuratorCacheListener implements CuratorCacheListenerBuilder.ChangeListener {
      
          private Environment environment;
          private ConfigurableApplicationContext applicationContext;
      
          public NodeDataChangeCuratorCacheListener(Environment environment, ConfigurableApplicationContext applicationContext) {
              this.environment = environment;
              this.applicationContext = applicationContext;
          }
      
          @Override
          public void event(ChildData oldNode, ChildData node) {
              System.out.println("收到数据变更事件");
              String resultData=new String (node.getData());
              ObjectMapper objectMapper=new ObjectMapper();
              try {
                  // 这就是zk上的配置文件,我们把这些配置文件变成map的形式
                  Map<String,Object> map=objectMapper.readValue(resultData, Map.class);
                  // environment对象
                  ConfigurableEnvironment cfe=(ConfigurableEnvironment)this.environment;
                  MapPropertySource mapPropertySource=new MapPropertySource("configService",map);
                  //替换里面存储配置文件的节点
                  cfe.getPropertySources().replace("configService",mapPropertySource);
                  //发布一个变更事件,这个最终会去调用我们ConfigurationPropertiesRebinder这个类的onApplicationEvent方法,
                  // 然后反射去对有@Value注解的字段进行赋值。从而达到动态刷新配置的效果
                  applicationContext.publishEvent(new EnvironmentChangeEvent(this));
                  System.out.println("数据更新完成");
              } catch (JsonProcessingException e) {
                  e.printStackTrace();
              }
      
          }
      }
  • 注册事件,以及反射赋值
    • //定义一个事件
      public class EnvironmentChangeEvent extends ApplicationEvent {
      
          EnvironmentChangeEvent(Object source) {
              super(source);
          }
      }
      @Component
      public class ConfigurationPropertiesRebinder implements ApplicationListener<EnvironmentChangeEvent> {
      
          private ConfigurationPropertiesBeans beans;
      
          private Environment environment;
      
          public ConfigurationPropertiesRebinder(ConfigurationPropertiesBeans beans, Environment environment) {
              this.beans=beans;
              this.environment=environment;
          }
      
          @Override
          public void onApplicationEvent(EnvironmentChangeEvent event) {
              //使用watcher机制对zk上面的存储配置文件的节点进行监控,当配置文件变化,就会触发这个代码
              System.out.println("收到environment变更事件");
              rebind();
          }
          public void rebind(){
              //拿到存储了有@Value属性的map,并且对Value(也就是存储了那些有@Value属性的字段和类对应关系的类)进行遍历,并且反射赋值
              this.beans.getFieldMapper().forEach((k,v)->{
                  v.forEach(f->f.resetValue(environment));
              });
          }
      }
  • 反射赋值方法。
    • public class FieldPair {
      
          private PropertyPlaceholderHelper propertyPlaceholderHelper=
                  new PropertyPlaceholderHelper("${","}",":",true);
      
          private Object bean;
          private Field field;
          private String value;
      
          public FieldPair(Object bean, Field field, String value) {
              this.bean = bean;
              this.field = field;
              this.value = value;
          }
      
      
          //对字段进行反射赋值
          public void resetValue(Environment environment){
              boolean access=field.isAccessible();
              if(!access){
                  field.setAccessible(true);
              }
              //
              String resetValue=propertyPlaceholderHelper.replacePlaceholders(value,environment::getProperty);
              try {
                  //反射修改bean的属性值
                  field.set(bean,resetValue);
              } catch (IllegalAccessException e) {
                  e.printStackTrace();
              }
          }
      }
  • 收集注解的bean执行后操作。收集@Value中的属性,并且维护成一个个FieldPair对象,并且存储在map中,之后要对这些对象中的属性反射赋值。
    • @Component
      public class ConfigurationPropertiesBeans implements BeanPostProcessor {
      
          // 存储所有@Value数值以及相关的bean
          private Map<String,List<FieldPair>> fieldMapper=new HashMap<>();
      
          //把类上有我们自定义注解RefreshScope的类拿到,然后循环类中的字段,
          // 如果字段中有@Value的注解,把@Value中的属性进行解析,并且存储在一个map中,
          // key就是被解析的@Value中的属性值,value我们自定义的一个对象,这个对象中维护了bean,属性名,以及@Value后面的属性值
          @Override
          public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
              //这里就是Spring中的所有bean
              Class clz=bean.getClass();
              // 判断这个类上是否有我们的自定义注解
              if(clz.isAnnotationPresent(RefreshScope.class)){
                  // 拿到类中的字段
                  for(Field field:clz.getDeclaredFields()){
                      // 看字段上时候否@Value的注解
                      Value value=field.getAnnotation(Value.class);
                      if(value==null){
                          continue;
                      }
                      //拿到@Value后面的属性并且分割出里面的核心key,因为可能是多个所以返回一个数组
                      List<String> keyList=getPropertyKey(value.value(),0);
                      for (String key:keyList){
                          //如果key对应的value为空,则新创建一个list,
                          // 然后给里面添加数据,key为我们的@Value中存储的字段,
                          // value是我们自己的一个实体类,类中维护了bean,字段名,以及@Value后的属性名
                          fieldMapper.computeIfAbsent(key,k->new ArrayList())
                                  .add(new FieldPair(bean,field,value.value()));
                      }
                  }
              }
              return bean;
          }
          //对@Value中的属性值进行解析,并且封装成一个list
          private List<String> getPropertyKey(String value,int begin){
              int start=value.indexOf("${",begin)+2;
              if(start<2){
                  return new ArrayList<>();
              }
              int middle=value.indexOf(":",start);
              int end=value.indexOf("}",start);
              String key;
              if(middle>0&&middle<end){
                  key=value.substring(start,middle);
              }else{
                  key=value.substring(start,end);
              }
              List<String> keys=getPropertyKey(value,end);
              keys.add(key);
              return keys;
          }
      
          public Map<String,List<FieldPair>> getFieldMapper(){
              return fieldMapper;
          }
      }
  • 自定义标注注解,这个注解你可以标记到任何你想获取动态配置的类中,上面的代码将会对它进行扫描
    • @Target({ElementType.TYPE,ElementType.METHOD})
      @Retention(RetentionPolicy.RUNTIME)
      @Documented
      public @interface RefreshScope {
      }
  • 至此,动态刷新注册中心完成
  • 以后我们想要扩展的话,直接实现PropertySourceLocator接口下的方法就行,并且把实现接口的类让SpringBoot去装配就行。流程为,我们在Spring初始之前,就加载了实现PropertySourceLocator的所有类,并循环执行了他们包装PropertySource的方法,然后把这些对象都放在了environment中了。相当于我们PropertySourceLocator变成了自动装配的key了。
  • 分布式协调-Zookeeper(手写配置中心&动态刷新) 

 分布式协调-Zookeeper(手写配置中心&动态刷新)

 

上一篇:The NetHack Learning Environment


下一篇:Android——外部存储+动态权限