Spring Cloud消息总线

Spring Cloud Bus

  • 使用场景
    用于广播应用状态变更到分布式系统中的各个关联的1节点。应用节点间不直接相互通讯,而通过消息总线来实现通知。
  • 默认实现
    AMQP(Rabbit MQ)

kafka

回顾Spring事件/监听

package com.segumentfault.spring.event;

import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;

/**
 * Spring 事件 Demo
 *
 */
public class SpringEventDemo {

    public static void main(String[] args) {
        // 创建 Annotation 驱动的 Spring 应用上下文
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        // 注册 EventConfiguration 到 Spring 应用上下文
        context.register(EventConfiguration.class);
        // 启动 Spring 应用上下文
        context.refresh();
        // AnnotationConfigApplicationContext 也是 ApplicationEventPublisher
        ApplicationEventPublisher publisher = context;
        // 发布一个 MyApplicationEvent
        publisher.publishEvent(new MyApplicationEvent("Hello,World"));
    }

    private static class MyApplicationEvent extends ApplicationEvent {

        public MyApplicationEvent(String message) {
            super(message);
        }
    }

    @Configuration
    public static class EventConfiguration {

        /**
         * 监听 {@link MyApplicationEvent}
         *
         * @param event {@link MyApplicationEvent}
         */
        @EventListener
        public void onEvent(MyApplicationEvent event) {
            System.out.println("监听事件 : " + event);
        }

    }

}

Spring Cloud Bus

改造user-service-client:使用 AMQP 整合 Spring Cloud Bus
1.增加 Maven 依赖

 <!-- 整合 Spring Cloud Bus : AMQP -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

2.启动依赖服务
user-service-client依赖:

  • Eureka Server(1000)
  • Config Server(7070)
  • Rabbit MQ(5672)

事件传播

如何定位 Application Context ID
通过访问http://localhost:8080/beans确认当前Application Context ID

{
    "context": "user-service-client:8080",
    "parent": "user-service-client",
    "beans": []
}

单点传播

POST http://localhost:8080/bus/refresh?destination=user-service-client:8080
执行curl:

curl -X POST http://localhost:8080/bus/refresh?destination=user-service-client:8080

日志输出:

INFO 28041 --- [nio-8080-exec-3] o.s.cloud.bus.event.RefreshListener      : Received remote refresh request. Keys refreshed []

集群传播
POST http://localhost:8080/bus/refresh?destination=user-service-client:**
执行 curl:

curl -X POST http://localhost:8080/bus/refresh?destination=user-service-client:**

日志输出:

INFO 28041 --- [nio-8080-exec-5] o.s.cloud.bus.event.RefreshListener      : Received remote refresh request. Keys refreshed []

通过日志可知事件监听器均为:org.springframework.cloud.bus.event.RefreshListener:

public class RefreshListener
        implements ApplicationListener<RefreshRemoteApplicationEvent> {

    private static Log log = LogFactory.getLog(RefreshListener.class);

    private ContextRefresher contextRefresher;

    public RefreshListener(ContextRefresher contextRefresher) {
        this.contextRefresher = contextRefresher;
    }

    @Override
    public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
        Set<String> keys = contextRefresher.refresh();
        log.info("Received remote refresh request. Keys refreshed " + keys);
    }
}

RefreshListener监听事件RefreshRemoteApplicationEvent
自定义RefreshRemoteApplicationEven监听器

@Configuration
public class BusConfiguration {

    @EventListener
    public void onRefreshRemoteApplicationEvent(RefreshRemoteApplicationEvent event) {

        System.out.printf(" Source : %s , originService : %s , destinationService : %s \n",
                event.getSource(),
                event.getOriginService(),
                event.getDestinationService());

    }
}

事件跟踪
默认事件跟踪功能是失效,需要通过配置项激活:spring.cloud.bus.trace.enabled=ture
端点:/trace
事件跟踪详情

 {
    "timestamp": 1513517631139,
    "info": {
      "signal": "spring.cloud.bus.sent",
      "type": "RefreshRemoteApplicationEvent",
      "id": "938c1305-02b8-4697-9ac4-5996908eb58d",
      "origin": "user-service-client:8080",
      "destination": "user-service-client:**"
    }
  },
  {
    "timestamp": 1513517631138,
    "info": {
      "signal": "spring.cloud.bus.ack",
      "event": "RefreshRemoteApplicationEvent",
      "id": "938c1305-02b8-4697-9ac4-5996908eb58d",
      "origin": "user-service-client:8080",
      "destination": "user-service-client:**"
    }
  }

内部事件类型

  • RefreshRemoteApplicationEvent
  • EnvironmentChangeRemoteApplicationEvent
  • AckRemoteApplicationEvent:ack激活
    自定义EnvironmentChangeRemoteAppliicationEvent监听
@EventListener
    public void onEnvironmentChangeRemoteApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {

        System.out.printf("EnvironmentChangeRemoteApplicationEvent - " +
                        " Source : %s , originService : %s , destinationService : %s \n",
                event.getSource(),
                event.getOriginService(),
                event.getDestinationService());

    }

POST请求 /bus/env

curl -X POST http://localhost:8080/bus/env

控制台输出:

EnvironmentChangeRemoteApplicationEvent -  Source : org.springframework.cloud.bus.endpoint.EnvironmentBusEndpoint@656c356c , originService : user-service-client:8080 , destinationService : ** 
2017-12-17 21:40:42.440  INFO 33364 --- [nio-8080-exec-3] o.s.c.b.event.EnvironmentChangeListener  : Received remote environment change request. Keys/values to update {}

EnvironmentChangeListener是默认的EnvironmentChangeRemoteApplicationEvent监听器实现
/trace的变化:

{
    "timestamp": 1513518042463,
    "info": {
      "signal": "spring.cloud.bus.sent",
      "type": "EnvironmentChangeRemoteApplicationEvent",
      "id": "1af8f5a0-6d1f-440a-82cd-e09876977d33",
      "origin": "user-service-client:8080",
      "destination": "**:**"
    }
  },
  {
    "timestamp": 1513518042462,
    "info": {
      "signal": "spring.cloud.bus.ack",
      "event": "EnvironmentChangeRemoteApplicationEvent",
      "id": "1af8f5a0-6d1f-440a-82cd-e09876977d33",
      "origin": "user-service-client:8080",
      "destination": "**"
    }
  },
上一篇:Spring Cloud 分布式应用跟踪


下一篇:kafka的安装与启动运行