文章目录
Java操作RabbitMQ
- 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
简单模式
- 消息提供者
// 1.连接RamabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.189.137"); // 连接地址
factory.setPort(5672); // 设置端口号
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/"); // 配置虚拟主机(每个团队用自己的消息队列)
Connection connection = factory.newConnection();// 创建连接
// 2.通过连接对象获取channel对象(队列,绑定,消息都是通过这个对象操作)
Channel channel = connection.createChannel();
// 3.通过channel创建一个队列(1:队列名称[如果队列不存在就创建])
channel.queueDeclare("myqueue",false,false,false,null);
// 4.发送消息到指定的队列(1:交换机,2:队列的名称,3:其他属性,4:消息体)
String msg = "hello 中文 2";
channel.basicPublish("","myqueue",null,msg.getBytes("utf-8"));
// 5.断开连接
connection.close();
- 消息消费者
工具类提取
public class ConnectionUtils {
private static ConnectionFactory factory =null;
static {
// 1.连接RamabbitMQ
factory = new ConnectionFactory();
factory.setHost("192.168.189.137"); // 连接地址
factory.setPort(5672); // 设置端口号
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/"); // 配置虚拟主机(每个团队用自己的消息队列)
}
public static Connection getConnection(){
Connection connection = null;// 创建连接
try {
connection = factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}
消费者
// 1.获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2.获取Channel
Channel channel = connection.createChannel();
// 3.通过channel创建一个队列(1:队列名称)
channel.queueDeclare("myqueue",false,false,false,null);
// 3.设置监听的队列(1:设置监听的队列,2:,3:消费者的回调方法
channel.basicConsume("myqueue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者:"+new String(body,"utf-8"));
}
});
// 消费者不能关闭连接
- 注意:
1.如果在队列没有的创建的情况下,消费者先启动,提供者后启动这时消费者会报错,并且线程也会被阻塞主,而且还获取不到提供者发送的消息。所以在开发中遇到这样的启动顺序的问题,在消息提供者和消息消费者都进行队列的创建。
2.消费者消费消息是一个同步的过程。开发的过程中一般都会使用一个线程池并发的消费消息。 - 线程池并发消费消息
// 创建一个线程池
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws IOException {
// 1.获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2.获取Channel
Channel channel = connection.createChannel();
// 3.通过channel创建一个队列(1:队列名称)
channel.queueDeclare("myqueue",false,false,false,null);
// 3.设置监听的队列(1:设置监听的队列,2:true为自动回复模式,3:消费者的回调方法)
channel.basicConsume("myqueue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("消费者:"+new String(body,"utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
});
}
Work模式
work模式和简单模式类似,区别在于它有多个消费者交替获取队列中的内容。实现起来也比简单,以上面的代码为例,重新拷贝一个消费者变成两个消费者即可。
- Fonout模式(广播类型)
- 消息提供者
// 1.得到连接对象
Connection connection = ConnectionUtils.getConnection();
// 2.获取channel对象
Channel channel = connection.createChannel();
// 3.创建交换机(fanout:广播类型)
channel.exchangeDeclare("myexchange1","fanout");
// 4.发送消息给交换机(1:交换机名称,2:队列名称,3:其他属性,4:消息内容)
String msg ="hello RebbitMQ";
channel.basicPublish("myexchange","",null,msg.getBytes("utf-8"));
connection.close();
- 消息消费者1
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 创建一个队列
channel.queueDeclare("queue1",false,false,false,null);
// 把队列和路交换机绑定(1:队列名称,2:交换机名称,3:路由键)
channel.queueBind("queue1","myexchange1","");
channel.basicConsume("queue1",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer1:"+new String(body,"utf-8"));
}
});
- 消息消费者2
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 创建一个队列
channel.queueDeclare("queue2",false,false,false,null);
// 把队列和路交换机绑定(1:队列名称,2:交换机名称,3:路由键)
channel.queueBind("queue2","myexchange1","");
// 设置监听的队列
channel.basicConsume("queue2",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer2:"+new String(body,"utf-8"));
}
});
SpringBoot操作RabbitMQ
- 之前在做商品添加的时候除了做把商品添加到数据库之外,还要添加商品信息添加到索引库,还要生成静态页面,这些都是通过同步来完成的效率比较低。现在我们学习完消息队列后可以进行改造了,利用消息队列异步来完成上面的操作从而来提高效率。
商品添加完之后可以通过异步的方式给MQ中添加一条消息,然后索引和静态页面消费MQ中的消息即可。 - 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置属性文件
spring.rabbitmq.host=192.168.189.137
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
- 提供者
- 创建队列和交换机
在创建商品的时候要通知索引和item两个工程,所以我们要用广播模式。创建完商品给交换机发送一条消息,然后交换机把消息广播给两个工程。所以我们需要创建两个队列和一个广播。
import org.springframework.amqp.core.*;
@Configuration
public class RabbitMQConfig {
// 创建一个搜索的队列
@Bean
public Queue getSeachQueue() {
return new Queue("search_queue");
}
// 创建一个静态页面的广播
@Bean
public Queue getItemQueue(){
return new Queue("item_queue");
}
// 创建一个交换机
@Bean
public FanoutExchange getFanoutExchange(){
return new FanoutExchange("goods_exchange");
}
// 把搜索队列绑定到交换机
@Bean
public Binding getBinDing1(){
return BindingBuilder.bind(getSeachQueue()).to(getFanoutExchange());
}
// 把详情页面绑定到交换机
@Bean
public Binding getBinDing2(){
return BindingBuilder.bind(getItemQueue()).to(getFanoutExchange());
}
}
- 发送消息给交换机
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public int add(Goods goods) {
int i = goodsMapper.insertSelective(goods);
System.out.println("GoodsServiceImpl.add:"+goods.getId());
// 2.同步到索引库
// searchService.addGoods(goods);
// 3.创建静态模板
// HttpUtils.sendRequset("http://localhost:8083/createHtml?gID="+goods.getId());
// 发送消息给交换机(1:交换机的名称,2:路由键,3:发送的对象[实现序列化接口])
rabbitTemplate.convertAndSend("goods_exchange","",goods);
return i;
}
- 消费者
添加依赖和配置属性文件和之前都一样 - 同步到索引库
@Component
public class MyRabbitMQListener {
@Autowired
private ISearchService searchService;
@RabbitListener(queues = "search_queue")
public void goodsSysncToSolr(Goods goods){
searchService.addGoods(goods);
}
}
- 生成静态页面
@Component
public class MyRabbitListener {
@Reference
private IGoodsService goodsService;
@Autowired
private Configuration configuration;
@RabbitListener(queues = "item_queue")
public void createItemPage(Goods goodsParam) throws Exception {
//生成静态页面的代码
} catch (Exception e) {
e.printStackTrace();
}
}
}