在项目中,我们经常会用到消息中间件来达到解耦或者削峰的目的。常用的消息中间件有RabbitMQ、Kafka等。虽然这些消息中间件之间的原理可能类似,但它们的使用方法却是大相径庭的,那如果我们要在项目中换一种消息中间件来实现原有的功能,那么我们需要花费大量的精力去修改原有的代码。
Spring Cloud Stream是一个构建消息驱动的微服务应用框架,它使用Binder和消息中间件建立联系,我们在使用的时候不需要关心我们到底是使用的是RabbitMQ还是Kafka,因此我们可以在消息中间件中随意切换。
1.实践环境
Spring Cloud Stream Rabbit 3.2.1
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.2.1</version>
</dependency>
Spring Cloud Stream从 3.1 开始支持函数式编程模型,因此我们的配置也是使用函数式编程模型来实现的。
2.生产者
2.1 yml配置文件
spring:
rabbitmq:
host: 192.168.1.2
port: 5672
username: guest
password: guest
virtual-host: /test
cloud:
stream:
bindings:
sms-out-0: # 自定义生产者通道名称
destination: sms-exchange# 自定义Exchange交换机名称,生产者和消费者需要配置同一个Exchange
2.2 生产者实现
@Component
public class SmsProducer{
@Resource
private StreamBridge streamBridge;
public void send(String message) {
// 生产者的通道名称须与yml中配置的属性一致
String SMS_OUTPUT = "sms-out-0";
streamBridge.send(SMS_OUTPUT , message);
}
}
3 .消费者
消费者使用函数式编程模型实现
3.1 yml配置文件
spring:
rabbitmq:
host: 192.168.1.2
port: 5672
username: guest
password: guest
virtual-host: /test
cloud:
stream:
bindings:
sms-in-0: # 自定义消费者通道名称
destination: sms-exchange# 自定义Exchange交换机名称,生产者和消费者需要配置同一个Exchange
3.2 消费者实现
@Component
public class SmsConsumer {
/**
* 方法名为sms,须与通道名称一致,sms-in-0通道名为sms
*/
@Bean
public Consumer<String> sms() {
return System.out::println;
}
}
4.测试
现在我们开始启动项目,会在RabbitMQ中创建一个Exchange以及与其绑定的一个临时队列(该队列会在应用停止运行时消失)
生成的Exchange
生成的临时队列
4.1 单元测试
@SpringBootTest(classes = StreamApplication.class)
public class StreamApplicationTest {
@Resource
private SmsProducer smsProducer;
@Test
public void testFunctionSend() {
smsProducer.send("测试函数式编程发送消息");
}
}
最后,欢迎关注微信公众号一起交流