1、界面管理
2、代码实现点对点的消息发送与消费(也就是一个生产者对应一个消费者。)
/**
* @author Suny
* @data 2021/11/1 20:36
* @description 生产者
*/
public class ProducerDemo01 {
public static void main(String[] args) throws Exception {
//创建mq的链接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置rabbitmq的主机地址
factory.setHost("127.0.0.1");
//配置端口号
factory.setPort(5672);
//配置虚拟主机
factory.setVirtualHost("/ems");
//配置账户和密码
factory.setUsername("ems");
factory.setPassword("ems");
//获取链接
Connection connection = factory.newConnection();
//通过链接获取链接中的通道
Channel channel = connection.createChannel();
//参数 队列名、队列是否要持久化、队列是否只允许当前通道使用、队列在消费完之后是否自动删除、额外附加参数
channel.queueDeclare("Hello",false,false,false,null);
//发布消息
//参数 交换机名称 、队列名称、传递消息的额外设置、消息具体内容
channel.basicPublish("","Hello",null,"Hello,World".getBytes());
channel.close();
connection.close();
}
}
/**
* @author Suny
* @data 2021/11/1 20:50
* @description 消费者
*/
public class ConsumerDemo01 {
public static void main(String[] args) throws Exception {
//创建mq的链接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置rabbitmq的主机地址
factory.setHost("127.0.0.1");
//配置端口号
factory.setPort(5672);
//配置虚拟主机
factory.setVirtualHost("/ems");
//配置账户和密码
factory.setUsername("ems");
factory.setPassword("ems");
//获取链接
Connection connection = factory.newConnection();
//通过链接获取链接中的通道
Channel channel = connection.createChannel();
//通道绑定队列
//参数 队列名、队列是否要持久化、队列是否只允许当前通道使用、队列在消费完之后是否自动删除、额外附加参数
channel.queueDeclare("Hello",false,false,false,null);
//消费消息
//参数 队列名称、是否开启消息自动确认机制
channel.basicConsume("Hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body);
System.out.println("str = " + str);
}
});
/*channel.close();
connection.close();*/
}
}
3、工作模式一个生产多个消费(一个生产者对应多个消费者,但是只能有一个消费者获得消息,也称之为竞争消费者模式。
)
/**
* @author Suny
* @data 2021/11/2 20:29
* @description 工作模式
*/
public class WorkProducer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMqConfig.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("workQueue",true,false,false,null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("","workQueue",MessageProperties.PERSISTENT_TEXT_PLAIN,(i +"Hello Work Queue").getBytes());
}
RabbitMqConfig.closeConnection(channel,connection);
}
}
// 消费者
/**
* @author Suny
* @data 2021/11/2 20:34
* @description 工作模式的消费者
*/
public class WorkConsumer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMqConfig.getConnection();
final Channel channel = connection.createChannel();
//每次只消费、确认一条消息
channel.basicQos(1);
//参数 队列名、队列是否要持久化、队列是否只允许当前通道使用、队列在消费完之后是否自动删除、额外附加参数
channel.queueDeclare("workQueue",true,false,false,null);
//参数 队列名称、是否开启消息自动确认机制
channel.basicConsume("workQueue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 - :" + new String(body));
//当消息消费完之后开启手动确认机制
//参数: 确认队列中那个具体的消息,是否开启多个消息同事确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
4、发布订阅(一个消费者将消息首先发送到交换器,交换器绑定多个队列,然后被监听该队列的消费者所接收并消费。)
/**
* @author Suny
* @data 2021/11/8 21:04
* @description 广播模式
*/
public class FanoutProducer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMqConfig.getConnection();
Channel channel = connection.createChannel();
//将通道声明指定的交换机
//参数 交换机名称、交换机的模式fanout为广播
channel.exchangeDeclare("logs","fanout");
channel.basicPublish("logs","",null,"fanout".getBytes());
channel.close();
connection.close();
}
}
/**
* @author Suny
* @data 2021/11/8 21:04
* @description 广播消费者,每一个消费者都会消费消息
*/
public class FanoutConsumer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMqConfig.getConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("logs","fanout");
//获取临时队列
String queueName = channel.queueDeclare().getQueue();
//队列绑定交换机
//参数 队列名 交换机名 路由key
channel.queueBind(queueName,"logs","");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1-fanout消费了");
}
});
}
}
/**
* @author Suny
* @data 2021/11/8 21:04
* @description 广播消费者
*/
public class FanoutConsumer2 {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMqConfig.getConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("logs","fanout");
//获取临时队列
String queueName = channel.queueDeclare().getQueue();
//队列绑定交换机
//参数 队列名 交换机名 路由key
channel.queueBind(queueName,"logs","");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer2-fanout消费了");
}
});
}
}
5、路由模式(生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的对列,接着监听该队列的消费者消费消息。)
/**
* @author Suny
* @data 2021/11/8 21:28
* @description 路由模式
*/
public class DirectProducer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMqConfig.getConnection();
Channel channel = connection.createChannel();
//将通道声明指定的交换机
//参数 交换机名称、交换机的模式DIRECT为广播
channel.exchangeDeclare("logs_Direct", "direct");
for (int i = 0; i < 10; i++) {
if (i % 2 == 0){
channel.basicPublish("logs_Direct","aaa",null,"DIRECT".getBytes());
}else {
channel.basicPublish("logs_Direct","bbb",null,"DIRECT".getBytes());
}
}
channel.close();
connection.close();
}
}
/**
* @author Suny
* @data 2021/11/8 21:28
* @description 路由模式
*/
public class DirectConsumer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMqConfig.getConnection();
Channel channel = connection.createChannel();
//绑定交换机 交换机名称、交换机模式DIRECT(路由)
channel.exchangeDeclare("logs_Direct",BuiltinExchangeType.DIRECT);
//获取临时队列
String queueName = channel.queueDeclare().getQueue();
//队列绑定交换机
//参数 队列名 交换机名 路由key
channel.queueBind(queueName,"logs_Direct","aaa");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String string = new String(body);
System.out.println(string);
}
});
}
}
/**
* @author Suny
* @data 2021/11/8 21:28
* @description 路由模式
*/
public class DirectConsumer2 {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMqConfig.getConnection();
Channel channel = connection.createChannel();
//绑定交换机 交换机名称、交换机模式DIRECT(路由)
channel.exchangeDeclare("logs_Direct","direct");
//获取临时队列
String queueName = channel.queueDeclare().getQueue();
//队列绑定交换机
//参数 队列名 交换机名 路由key
channel.queueBind(queueName,"logs_Direct","bbb");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1-fanout消费了");
}
});
}
}
6、动态路由
/**
* @author Suny
* @data 2021/11/8 21:28
* @description 动态路由模式、主题模式支持统配符号比如*
*/
public class TopicProducer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMqConfig.getConnection();
Channel channel = connection.createChannel();
//交换机名、交换机类型
channel.exchangeDeclare("Topic", BuiltinExchangeType.TOPIC);
String RoutingKey = "user.info";
channel.basicPublish("Topic",RoutingKey,null,"Topic交换机".getBytes());
RabbitMqConfig.closeConnection(channel,connection);
}
}
/**
* @author Suny
* @data 2021/11/8 21:28
* @description 动态路由
*/
public class TopicConsumer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMqConfig.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("Topic", BuiltinExchangeType.TOPIC);
//获取临时队列
String queueName = channel.queueDeclare().getQueue();
//参数 队列名 交换机名 路由key
//通配符*表示只匹配一个、#表示匹配多个
channel.queueBind(queueName,"Topic","*.info");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body);
System.out.println(str);
}
});
}
}
spring boot集成Rabbitmq
// 相关jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
//相关配置文件
# rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=ems
spring.rabbitmq.password=ems
spring.rabbitmq.virtual-host=/ems
# 手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 一次消费一条消息
spring.rabbitmq.listener.simple.prefetch= 1
spring boot生产者
package com.itheima.demo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author Suny
* @data 2021/11/9 20:31
* @description Springboot集成rabbitmq
*/
@SpringBootTest
public class RabbitmqTest {
//注入rabbitmq的使用模板
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHello(){
//点对点模式
//队列名和消息
rabbitTemplate.convertAndSend("Boot","HelloRabbitmq");
}
@Test
public void testWork(){
//工作模式
//队列名和消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work2","HelloWork");
}
}
@Test
public void testFanout(){
//广播模式
//交换机名、路由、消息
rabbitTemplate.convertAndSend("FanoutExchange","","FanoutExchange");
}
@Test
public void testRoutingKey(){
//路由模式
//交换机名、路由、消息
rabbitTemplate.convertAndSend("RoutingKey2021","userInfo","RoutingKey2021");
}
@Test
public void testTopic(){
//动态路由模式
//交换机名、路由、消息
rabbitTemplate.convertAndSend("testTopic","user.Info","testTopic");
}
}
spring boot消费者
/**
* @author Suny
* @data 2021/11/9 20:37
* @description rabbitmq消费者
* 点对点消费
*/
@Component
//当不配置持久化和自动删除时默认的是 不排他和非持久化的队列
@RabbitListener(queuesToDeclare = @Queue(value = "Boot",durable = "false",autoDelete = "false"))
public class Consumer {
@RabbitHandler
//@RabbitHandler使用此注解接收@RabbitListener指定的队列
public void receive1(String message){
System.out.println("message = " + message);
}
}
/**
* @author Suny
* @data 2021/11/9 20:37
* @description Springboot中使用work模式、并使用手动ACK进行消息确认
*/
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue("work2"))
public void receive1(String msg, Message message, Channel channel){
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("message1 = " + msg);
channel.basicAck(deliveryTag,false);
}catch (Exception e){
e.printStackTrace();
}
}
@RabbitListener(queuesToDeclare = @Queue("work2"))
public void receive2(String msg, Message message, Channel channel){
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("message2 = " + msg);
channel.basicAck(deliveryTag,false);
}catch (Exception e){
e.printStackTrace();
}
}
}
/**
* @author Suny
* @data 2021/11/9 20:56
* @description 广播模式的消费
*/
@Component
public class FanoutConsumer {
@RabbitListener(bindings = {
@QueueBinding(
//如果不指定队列会创建临时队列
value = @Queue("fanout2021"),
exchange = @Exchange(value = "FanoutExchange",type = "fanout")
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
//如果不指定队列会创建临时队列
value = @Queue("fanout2022"),
exchange = @Exchange(value = "FanoutExchange",type = "fanout")
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
/**
* @author Suny
* @data 2021/11/9 20:56
* @description 路由模式
*/
@Component
public class RoutingConsumer {
@RabbitListener(bindings = {
@QueueBinding(
//如果不指定队列会创建临时队列
value = @Queue,
exchange = @Exchange(value = "RoutingKey2021",type = "direct"),
key = {"userInfo"}
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
//如果不指定队列会创建临时队列
value = @Queue,
exchange = @Exchange(value = "RoutingKey2021",type = "direct"),
key = {"userInfo"}
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
/**
* @author Suny
* @data 2021/11/9 20:56
* @description动态路由模式
*/
@Component
public class TopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
//如果不指定队列会创建临时队列
value = @Queue,
exchange = @Exchange(value = "testTopic",type = "topic"),
key = {"user.*"}
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
//如果不指定队列会创建临时队列
value = @Queue,
exchange = @Exchange(value = "testTopic",type = "topic"),
key = {"user.*"}
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}