我有一个简单的spring-driven服务,它通过amqp发布事件.
该配置基于bootiful-axon.
现在,我希望该服务保持某些私有状态.这是一个简单的用例,可以通过3个额外的事件来实现.这些事件在服务范围之外没有任何意义,因此我不希望它们“离开”.
如何指定应该通过amqp发布哪些事件,哪些不应该发布?
解决方法:
这是我解决的方法:
自定义SpringAMQPPublisher,它拦截send方法:
public class SelectiveAmqpPublisher extends SpringAMQPPublisher {
static boolean shouldSend (Class<?> pt) {
return PublicEvent.class.isAssignableFrom(pt);
}
public SelectiveAmqpPublisher (
SubscribableMessageSource<EventMessage<?>> messageSource) {
super(messageSource);
}
@Override
protected void send (List<? extends EventMessage<?>> events) {
super.send(events.stream()
.filter(e -> shouldSend(e.getPayloadType()))
.collect(Collectors.toList()));
}
}
组态:
@Autowired
private AMQPProperties amqpProperties;
@Autowired
private RoutingKeyResolver routingKeyResolver;
@Autowired
private AMQPMessageConverter aMQPMessageConverter;
@Bean(initMethod = "start", destroyMethod = "shutDown")
public SpringAMQPPublisher amqpBridge(
EventBus eventBus,
ConnectionFactory connectionFactory,
AMQPMessageConverter amqpMessageConverter) {
SpringAMQPPublisher publisher = new SelectiveAmqpPublisher(eventBus);
// The rest is from axon-spring-autoconfigure...
publisher.setExchangeName(amqpProperties.getExchange());
publisher.setConnectionFactory(connectionFactory);
publisher.setMessageConverter(amqpMessageConverter);
switch (amqpProperties.getTransactionMode()) {
case TRANSACTIONAL:
publisher.setTransactional(true);
break;
case PUBLISHER_ACK:
publisher.setWaitForPublisherAck(true);
break;
case NONE:
break;
default:
throw new IllegalStateException("....");
}
return publisher;
}