RabbitMQ
这是一份RabbitMQ复习资料,长时间没有用消息队列会忘记其机制,用于简单回顾其基本用法。了解之后,其具体用法可以查看源码,也可以查看中文文档:http://rabbitmq.mr-ping.com/ClientDocumentation/java-api-guide.html
控制台地址 http://localhost:15672/
〇、初始连接
ConnectionFactory factory = new ConnectionFactory();
# 更改参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
# connection,channel一般用try with包裹
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
#具体代码
...
channel.close()
connection.close()
一、介绍一下RabbitMQ中的成员
- Producer(生产者):将消息发送到Exchange
- Exchange(交换器):将从Producer接收到的消息路由到Queue或Exchange
- Queue(队列):存储共Consumer消费的消息
- Bindingkey(绑定键):建立Exchange与Queue之间的关系,作用于’direct’和’topic’模式下的路由选择
- RoutingKey(路由键):Producer发送消息与路由键到Exchange,Exchange将判断RoutingKey是否符合BingdingKey,进行路由选择
- Consumer(消费者):从队列中获取消息
下面是各个成员的作用图解
二、介绍Exchange
Exchange的属性
- exchange:名称
- type:类型(direct,topic,fanout)
- durable:是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
- autoDelete:是否自动删除,如果没有与之绑定的Queue,直接删除该Exchange
- internal:是否内置的,如果为true,只能通过Exchange到Exchange(不跟生产者直接相连的交换器)
- arguments:结构化参数
# 声明一个交换机
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments)
# 交换机与交换机绑定
Exchange.BindOk exchangeBind(String destination, String source, String routingKey)
# 交换机与队列绑定
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
三、介绍Queue
Queue的属性,相比与Exchange,Queue少一个internal属性,但是多一个exclusive属性
- exclusive:是否排他,如果未true,则只在第一次创建它的Connection中有效,当Connection关闭,该Queue也会被删除
# 声明一个队列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
四、消费者Consumer
消费者的主要属性autoAck
注意:如果设置autoAck为true,则一次消费掉所有消息。 所以一般我们都设置成false, 默认就是false
# 重写handleDelivery分发方法,实现消费
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息内容:" + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//消息确认
// channel.basicAck(envelope.getDeliveryTag(), false);
}
};
# 进行一次消费
String basicConsume(String queue, boolean autoAck, Consumer callback)
String basicConsume(String queue, Consumer callback) throws IOException;
五、maven依赖
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.11.0</version>
</dependency>
六、参考
https://blog.csdn.net/yaomingyang/article/details/102636657
https://blog.csdn.net/dh554112075/article/details/90182453
https://blog.csdn.net/weixin_40805079/article/details/85631693