Maven
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
</dependencies>
RabbitMQ五种队列形式
1.点对点(简单)的队列
封装Connection
public class MQConnectionUtils {
public static Connection newConnection() throws IOException, TimeoutException {
// 1.定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置服务器地址
factory.setHost("127.0.0.1");
// 3.设置协议端口号
factory.setPort(5672);
// 4.设置vhost
factory.setVirtualHost("/test001_host");
// 5.设置用户名称
factory.setUsername("test001");
// 6.设置用户密码
factory.setPassword("123456");
// 7.创建新的连接
Connection newConnection = factory.newConnection();
return newConnection;
}
}
生产者
public class Producer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = newConnection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "test_110";
System.out.println("生产者发送消息:" + msg);
// 4.发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.close();
newConnection.close();
}
}
消费者
public class Customer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("002");
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
// 2.获取通道
Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
2.工作(公平性)队列模式
生产者
public class Producer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = newConnection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
for (int i = 1; i <= 50; i++) {
String msg = "test_" + i;
System.out.println("生产者发送消息:" + msg);
// 4.发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
channel.close();
newConnection.close();
}
}
消费者
public class Customer1 {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("001");
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
// 2.获取通道
final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
try {
Thread.sleep(1000);
} catch (Exception e) {
} finally {
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
3.发布订阅模式
该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
交换机的四种类型:
Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列
Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
生产者
public class ProducerFanout {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String msg = "fanout_exchange_msg";
// 4.发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
// System.out.println("生产者发送msg:" + msg);
// // 5.关闭通道、连接
// channel.close();
// connection.close();
// 注意:如果消费没有绑定交换机和队列,则消息会丢失
}
}
邮件消费者
public class ConsumerEmailFanout {
private static final String QUEUE_NAME = "consumerFanout_email";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者启动");
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
短信消费者
public class ConsumerSMSFanout {
private static final String QUEUE_NAME = "ConsumerFanout_sms";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者启动");
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
4.路由模式Routing
生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要指定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
生产者
public class ProducerDirect {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String routingKey = "info";
String msg = "direct_exchange_msg" + routingKey;
// 4.发送消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("生产者发送msg:" + msg);
// // 5.关闭通道、连接
// channel.close();
// connection.close();
// 注意:如果消费没有绑定交换机和队列,则消息会丢失
}
}
邮件消费者
public class ConsumerEmailDirect {
private static final String QUEUE_NAME = "consumer_direct_email";
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者启动");
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
短信消费者
public class ConsumerSMSDirect {
private static final String QUEUE_NAME = "consumer_direct_sms";
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者启动");
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
5.通配符模式Topics
此模式是在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;
符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor
生产者
public class ProducerDirect {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "log.info.error";
String msg = "topic_exchange_msg" + routingKey;
// 4.发送消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("生产者发送msg:" + msg);
// // 5.关闭通道、连接
channel.close();
connection.close();
// 注意:如果消费没有绑定交换机和队列,则消息会丢失
}
}
邮件消费者
public class ConsumerEmailDirect {
private static final String QUEUE_NAME = "consumer_topic_email";
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者启动");
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
短信服务器
public class ConsumerSMSDirect {
private static final String QUEUE_NAME = "consumer_topic_sms";
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者启动");
// 1.创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
RabbitMQ消息确认机制
生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。
1.AMQP 事务机制
txSelect 将当前channel设置为transaction模式
txCommit 提交当前事务
txRollback 事务回滚
2.Confirm 模式
就是生产者发送消息后,RabbitMQ服务器会发送一个确认给生产者