Spring Cloud Bus
- 使用场景
用于广播应用状态变更到分布式系统中的各个关联的1节点。应用节点间不直接相互通讯,而通过消息总线来实现通知。 - 默认实现
AMQP(Rabbit MQ)
kafka
回顾Spring事件/监听
package com.segumentfault.spring.event;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
/**
* Spring 事件 Demo
*
*/
public class SpringEventDemo {
public static void main(String[] args) {
// 创建 Annotation 驱动的 Spring 应用上下文
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
// 注册 EventConfiguration 到 Spring 应用上下文
context.register(EventConfiguration.class);
// 启动 Spring 应用上下文
context.refresh();
// AnnotationConfigApplicationContext 也是 ApplicationEventPublisher
ApplicationEventPublisher publisher = context;
// 发布一个 MyApplicationEvent
publisher.publishEvent(new MyApplicationEvent("Hello,World"));
}
private static class MyApplicationEvent extends ApplicationEvent {
public MyApplicationEvent(String message) {
super(message);
}
}
@Configuration
public static class EventConfiguration {
/**
* 监听 {@link MyApplicationEvent}
*
* @param event {@link MyApplicationEvent}
*/
@EventListener
public void onEvent(MyApplicationEvent event) {
System.out.println("监听事件 : " + event);
}
}
}
Spring Cloud Bus
改造user-service-client
:使用 AMQP 整合 Spring Cloud Bus
1.增加 Maven 依赖
<!-- 整合 Spring Cloud Bus : AMQP -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
2.启动依赖服务
user-service-client依赖:
- Eureka Server(1000)
- Config Server(7070)
- Rabbit MQ(5672)
事件传播
如何定位 Application Context ID
通过访问http://localhost:8080/beans
确认当前Application Context ID
{
"context": "user-service-client:8080",
"parent": "user-service-client",
"beans": []
}
单点传播
POST http://localhost:8080/bus/refresh?destination=user-service-client:8080
执行curl:
curl -X POST http://localhost:8080/bus/refresh?destination=user-service-client:8080
日志输出:
INFO 28041 --- [nio-8080-exec-3] o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed []
集群传播
POST http://localhost:8080/bus/refresh?destination=user-service-client:**
执行 curl:
curl -X POST http://localhost:8080/bus/refresh?destination=user-service-client:**
日志输出:
INFO 28041 --- [nio-8080-exec-5] o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed []
通过日志可知事件监听器均为:org.springframework.cloud.bus.event.RefreshListener:
public class RefreshListener
implements ApplicationListener<RefreshRemoteApplicationEvent> {
private static Log log = LogFactory.getLog(RefreshListener.class);
private ContextRefresher contextRefresher;
public RefreshListener(ContextRefresher contextRefresher) {
this.contextRefresher = contextRefresher;
}
@Override
public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
Set<String> keys = contextRefresher.refresh();
log.info("Received remote refresh request. Keys refreshed " + keys);
}
}
RefreshListener监听事件RefreshRemoteApplicationEvent
自定义RefreshRemoteApplicationEven监听器
@Configuration
public class BusConfiguration {
@EventListener
public void onRefreshRemoteApplicationEvent(RefreshRemoteApplicationEvent event) {
System.out.printf(" Source : %s , originService : %s , destinationService : %s \n",
event.getSource(),
event.getOriginService(),
event.getDestinationService());
}
}
事件跟踪
默认事件跟踪功能是失效,需要通过配置项激活:spring.cloud.bus.trace.enabled=ture
端点:/trace
事件跟踪详情
{
"timestamp": 1513517631139,
"info": {
"signal": "spring.cloud.bus.sent",
"type": "RefreshRemoteApplicationEvent",
"id": "938c1305-02b8-4697-9ac4-5996908eb58d",
"origin": "user-service-client:8080",
"destination": "user-service-client:**"
}
},
{
"timestamp": 1513517631138,
"info": {
"signal": "spring.cloud.bus.ack",
"event": "RefreshRemoteApplicationEvent",
"id": "938c1305-02b8-4697-9ac4-5996908eb58d",
"origin": "user-service-client:8080",
"destination": "user-service-client:**"
}
}
内部事件类型
- RefreshRemoteApplicationEvent
- EnvironmentChangeRemoteApplicationEvent
- AckRemoteApplicationEvent:ack激活
自定义EnvironmentChangeRemoteAppliicationEvent监听
@EventListener
public void onEnvironmentChangeRemoteApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
System.out.printf("EnvironmentChangeRemoteApplicationEvent - " +
" Source : %s , originService : %s , destinationService : %s \n",
event.getSource(),
event.getOriginService(),
event.getDestinationService());
}
POST请求 /bus/env
curl -X POST http://localhost:8080/bus/env
控制台输出:
EnvironmentChangeRemoteApplicationEvent - Source : org.springframework.cloud.bus.endpoint.EnvironmentBusEndpoint@656c356c , originService : user-service-client:8080 , destinationService : **
2017-12-17 21:40:42.440 INFO 33364 --- [nio-8080-exec-3] o.s.c.b.event.EnvironmentChangeListener : Received remote environment change request. Keys/values to update {}
EnvironmentChangeListener是默认的EnvironmentChangeRemoteApplicationEvent监听器实现
/trace的变化:
{
"timestamp": 1513518042463,
"info": {
"signal": "spring.cloud.bus.sent",
"type": "EnvironmentChangeRemoteApplicationEvent",
"id": "1af8f5a0-6d1f-440a-82cd-e09876977d33",
"origin": "user-service-client:8080",
"destination": "**:**"
}
},
{
"timestamp": 1513518042462,
"info": {
"signal": "spring.cloud.bus.ack",
"event": "EnvironmentChangeRemoteApplicationEvent",
"id": "1af8f5a0-6d1f-440a-82cd-e09876977d33",
"origin": "user-service-client:8080",
"destination": "**"
}
},