如今较为流行的消息中间件,例如RabbitMQ和Kafka,在此介绍使用SpringBoot集成RabbitMQ。
1.引入SpringBoot集成的RabbitMQ的启动包依赖,因为SpringBoot对RabbitMQ有着很好的集成,所以在导入依赖时,这里我的SpringBoot使用的是最新版的:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2.配置配置文件application.yml,在此注意我们的ip地址和端口号就行:
server:
port: 8021
spring:
application:
name: rabbitmq-provider
//配置我们的rabbitmq的主机地址,本机Windows的话就用127.0.0.1,其他主机的话就用ip地址,这里我使用的是我的服务器ip地址
rabbitmq:
host: 101.35.105.175
//配置我们的rabbitmq的端口号,开放的一般是5672
port: 5672
username: admin
password: 123
#虚拟host 可以不设置,使用server默认host
virtual-host: JCcccHost
3.创建好我们的工程之后,创建生产者:这里我们对各个不同的rabbitmq的工作模式分别创建不同:
3.1、HelloWorld模式:
生产者:这里我们使用RabbitTemplate 的模板对象,我们只需对其进行注入就能使用RabbitTemplate 对象对消息进行操作。
@SpringBootTest(classes = SpringbootRabbitmqApplication.class)
@RunWith(SpringRunner.class)
public class HelloWorldDemo {
@Autowired
RabbitTemplate rt;
@Test
public void send() {
rt.convertAndSend("hello-world","hello world!!!");
}
}
消费者:
使用@RabbitListener(queuesToDeclare = @Queue(“hello-world”))去对我们的队列进行绑定。
这里的msg参数,是接收生产者产生的消息。
@Component
@RabbitListener(queuesToDeclare = @Queue("hello-world"))
public class HelloWorld {
@RabbitHandler
public void receive(String msg){
System.out.println("msg:"+msg);
}
}
执行后:
3.2、work模式:
生产者:
package com.example.springbootrabbitmq.helloworld;
/*
PACKAGE_NAME:com.example.springbootrabbitmq.helloworld
USER:18413
DATE:2021/10/6 18:29
PROJECT_NAME:RabbitMq
面向代码面向君,不负代码不负卿————蒋明辉 */
import com.example.springbootrabbitmq.SpringbootRabbitmqApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = SpringbootRabbitmqApplication.class)
@RunWith(SpringRunner.class)
public class HelloWorldDemo {
@Autowired
RabbitTemplate rt;
@Test
public void send() {
for (int i = 0; i < 100; i++) {
rt.convertAndSend("hello-world","hello world!!!"+i);
}
}
}
消费者:多个消费者,绑定一个消息队列,轮询方式消费信息
package com.example.springbootrabbitmq.helloworld;
/*
PACKAGE_NAME:com.example.springbootrabbitmq.helloworld
USER:18413
DATE:2021/10/6 17:05
PROJECT_NAME:RabbitMq
面向代码面向君,不负代码不负卿————蒋明辉 */
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HelloWorld {
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("hello-world"))
public void receive01(String msg){
System.out.println("receive01:"+msg);
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("hello-world"))
public void receive02(String msg){
System.out.println("receive02:"+msg);
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("hello-world"))
public void receive03(String msg){
System.out.println("receive03:"+msg);
}
}
3.3、fanout模式——广播模式
生产者:
第一个参数:设置我们的交换机
第二个参数:设置routekey
第三个参数:消息体
@Test
public void fanOutSend() {
rt.convertAndSend("logs","","i am fanout");
}
消费者:注解去绑定临时队列,绑定交换机,绑定消息类型
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "logs",type = "fanout"))
})
public void receive02(String msg){
System.out.println("receive02:"+msg);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "logs",type = "fanout"))
})
public void receive03(String msg){
System.out.println("receive03:"+msg);
}
3.4、direct路由模式:绑定我们的路由key
生产者:
@Test
public void DirectSend() {
rt.convertAndSend("route-directs","info","i am info");
rt.convertAndSend("route-directs","error","i am error");
}
消费者:
package com.example.springbootrabbitmq.direct;
/*
PACKAGE_NAME:com.example.springbootrabbitmq.direct
USER:18413
DATE:2021/10/6 20:21
PROJECT_NAME:RabbitMq
面向代码面向君,不负代码不负卿————蒋明辉 */
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Direct {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {"info"},
exchange = @Exchange(value = "route-directs",type = "direct"))
})
public void receive01(String msg){
System.out.println("receive01:"+msg);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {"info","error"},
exchange = @Exchange(value = "route-directs",type = "direct"))
})
public void receive02(String msg){
System.out.println("receive02:"+msg);
}
}
3.5、Topic(动态路由):
生产者:
@Test
public void TopicSend() {
rt.convertAndSend("route-Topic","info","i am info");
rt.convertAndSend("route-Topic","info.save","i am info");
rt.convertAndSend("route-Topic","info.save.send","i am info");
rt.convertAndSend("route-Topic","error.save","i am error");
rt.convertAndSend("route-Topic","error.save.send","i am error");
}
消费者:
package com.example.springbootrabbitmq.topic;
/*
PACKAGE_NAME:com.example.springbootrabbitmq.topic
USER:18413
DATE:2021/10/6 21:48
PROJECT_NAME:RabbitMq
面向代码面向君,不负代码不负卿————蒋明辉 */
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Topic {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {"info.*","error.*"},
exchange = @Exchange(value = "route-topic",type = "topic"))
})
public void receive01(String msg){
System.out.println("receive01:"+msg);
}
}
4、各位老板可以点个免费的关注吗 谢谢啦!!!