spring cloud bus 扩展消息总线方式

spring cloud 本身实现了消息总线机制,机制如图1。spring cloud 本身实现了变量修改/bus/env/bus/refresh两个接口,我们需要扩展一个自己的刷新缓存的接口,来应对业务需求。业务代码是看源码修改出来的,并不代表理解都对。

图1

实现

介绍

一般来说微服务一上需要实现接收用户信息的接口,并且将消息传递给消息总线。其他微服务实现接收消息总线分发信息的服务。为了简化业务所有的微服务都具有接收用户指令和接收消息总线指令的能力。

AbstractBusEndpoint Bus接口实现

这个接口来实现对用户的访问实现,需要继承AbstractBusEndpoint,我们提供了/bus/removecache接口来供用户访问。

@ManagedResource
public class GuavaCacheBusEndpoint extends AbstractBusEndpoint {
    public GuavaCacheBusEndpoint(ApplicationEventPublisher context, String id, BusEndpoint delegate) {
        super(context, id, delegate);
    }

    @RequestMapping(
            value = {"removecache"},
            method = {RequestMethod.POST}
    )
    @ResponseBody
    @ManagedOperation
    public void removecache(@RequestParam Map<String, String> params, @RequestParam(value = "destination",required = false) String destination) {
        this.publish(new GuavaCacheChangeRemoteApplicationEvent(this, this.getInstanceId(), destination, params));
    }
}

RemoteApplicationEvent 被传输的消息(GuavaCacheChangeRemoteApplicationEvent)

这个类需要继承RemoteApplicationEvent牵扯到消息总线序列化等信息,我们可以在其中增加Map来实现带参数传递

@JsonTypeInfo(
        use = JsonTypeInfo.Id.NAME,
        property = "type"
)
@JsonIgnoreProperties({"source"})
public class GuavaCacheChangeRemoteApplicationEvent extends RemoteApplicationEvent {
    private final Map<String, String> values;

    private GuavaCacheChangeRemoteApplicationEvent() {
        this.values = null;
    }

    public GuavaCacheChangeRemoteApplicationEvent(Object source, String originService, String destinationService, Map<String, String> values) {
        super(source, originService, destinationService);
        this.values = values;
    }

    public Map<String, String> getValues(){
        return values;
    }
}

BusAutoConfiguration Bus消息自动注册

实现spring config 配置解析和加载我们的自定义配置,我们定义了spring.cloud.bus.guava.enabledendpoints.spring.cloud.bus.guava.enabled来启用消息总线消息监听和用户访问消息监听。

@Configuration
public class BusGuavaAutoConfiguration extends BusAutoConfiguration {

    public BusGuavaAutoConfiguration(){}

    @Configuration
    protected static class BusGuavaConfiguration {
        protected BusGuavaConfiguration() {}

        @Bean
        @ConditionalOnProperty(
                value = {"spring.cloud.bus.guava.enabled"},
                matchIfMissing = true
        )
        public GuavaCacheChangeListener guavaCacheChangeListener() {
            return new GuavaCacheChangeListener();
        }

        @Configuration
        @ConditionalOnClass({Endpoint.class})
        @ConditionalOnProperty(
                value = {"endpoints.spring.cloud.bus.guava.enabled"},
                matchIfMissing = true
        )
        protected static class GuavaBusEndpointConfiguration {
            protected GuavaBusEndpointConfiguration() {}

            @Bean
            public GuavaCacheBusEndpoint environmentBusEndpoint(ApplicationContext context, BusEndpoint busEndpoint) {
                return new GuavaCacheBusEndpoint(context, context.getId(), busEndpoint);
            }
        }
    }
}

ApplicationListener实现

ApplicationListener来实现监听消息总线的监听器,消息总线返回后会执行这里。

public class GuavaCacheChangeListener implements ApplicationListener<GuavaCacheChangeRemoteApplicationEvent> {
    @Override
    public void onApplicationEvent(GuavaCacheChangeRemoteApplicationEvent guavaCacheChangeRemoteApplicationEvent) {
        Map<String, String> values = guavaCacheChangeRemoteApplicationEvent.getValues();
        //TODO
    }
}

转换器配置

RemoteApplicationEvent需要被spring cloud 内置的BusJacksonMessageConverter转换器扫描到才可以实现转换,在App启动类中增加注解@RemoteApplicationEventScan(basePackages = "你的package地址")

上一篇:Failed to get D-Bus connection: Operation not permitted


下一篇:xcode中如何安装多个版本的模拟器