代码跟上一篇博客差不多:
生产者:
com.itheima.producer.Producer_WordQueue:
package com.itheima.producer;
/**
* @author QLBF
* @version 1.0
* @date 2021/2/26 10:55
*/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*/
public class Producer_WordQueue {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); //ip,写RabbitMQ启动的ip
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast"); //虚拟机 默认值/
factory.setUsername("heima"); //用户名 默认 guest
factory.setPassword("heima"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection=factory.newConnection();
//4. 创建Channel
Channel channel=connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫work_queues的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""(简单模式相当于没交换机)
2. routingKey:路由名称,与上面的队列对应
3. props:配置信息
4. body:发送消息数据
*/
//演示一次发10条消息,看Consumer怎么消费
for (int i = 0; i < 10; i++) {
String boby=i+"、hello work_queues...";
//6. 发送消息
channel.basicPublish("","work_queues",null,boby.getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}
消费者:
com.itheima.consumer.Consumer_WorkQueues1:
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author QLBF
* @version 1.0
* @date 2021/2/26 11:39
*/
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); //ip,写RabbitMQ启动的ip
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast"); //虚拟机 默认值/,这个是我在启动Rabbit网站设置的
factory.setUsername("heima"); //用户名 默认 guest
factory.setPassword("heima"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection=factory.newConnection();
//4. 创建Channel
Channel channel=connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫work_queues的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,从生产者的work_queues拿出来
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 6.接收消息
Consumer consumer=new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body)); //获取生产者发给MQ的消息,转为字符串
}
};
channel.basicConsume("work_queues",true,consumer);
//7关闭资源?不要
}
}
com.itheima.consumer.Consumer_WorkQueues2跟Consumer_WorkQueues1一模一样,名字改下就好。
然后直接run这些类:
先消费者后生产者:
消费者输出:
两者竞争消费,演示成功!!