1 rabbitMQ入门教程-哔站-百知教育
在此感谢哔哩哔哩的up主:编程不良人,视频地址:https://www.bilibili.com/video/BV1dE411K7MG ,于2019-12-30学习后用Typora0.9.98整理的观后感。
1.1 MQ引言
1.1.1 MQ介绍
-
MQ(message quene):消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断向队列中获取消息。因为消息的生产者和消费者都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入轻松实现系统间解耦。
-
MQ种类:
- ActiveMQ
- Kafka
- RocketMQ
- RabbitMQ
-
RabbitMQ:基于AMQP协议,erlang语言开发的,是部署最广泛的开源消息中间件。
1.2 rabbitmq安装
1.3 管理命令行及管理界面初识
1.4 消息发布模式
1.4.1 第一种模型:直连
-
引入依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency>
-
开发生产者
//创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接rabbitmq的主机 connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); //设置连接哪个虚拟主机 connectionFactory.setVirtualHost("/virHost1"); //设置访问虚拟主机的用户名及密码 connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取连接中的通道 Channel channel = connection.createChannel(); //通道绑定对应的消息队列 消费者与生产者的特性要一致 // argument1:队列名称(不存在会自动创建) // argument2:定义队列是否要持久化,仅仅是队列持久化,若想消息持久化还得额外设置(发布消息时) true-持久化 // argument3:druable 是否独占队列,即通道独占 true-独占 // argument4:autoDelete 是否在消费完成后自动删除队列 true-自动删除 // argument5:额外附加参数 channel.queueDeclare("hello", false, false, false, null); //发布消息 // argument1:交换机名称 // argument2:队列名称 // argument3:传递消息的额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN:消息持久化 // argument4:消息的具体内容) channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello message".getBytes()); channel.close(); connection.close();
-
开发消费者
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/virHost1"); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); //消费消息 (消费的队列名称, 开始消息的自动确认机制, 消费时的回调接口) channel.basicConsume("hello", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){ System.out.println( new String(body) ); } }); /* 不关闭则可以一直监听队列 channel.close(); connection.close(); */
-
总结:消费者不能用@Test注解和关闭通道连接,否则看不到执行结果,因为多线程问题未等到处理便被关闭了
1.4.2 第二种模型:work queue 任务模型
WorkQueue任务模型,当消息比较耗时,生产消息的速度大于消费者速度,就会导致堆积越来越多无法处理。此时就可以使用该模型:让多个消费者绑定到一个队列,共同消费队列中的消息。
1.4.2.1 work模型之平均消费消息-轮询
- rabbitmq中连接工具类封装
package messagequeue.rabbitmq.baizhiedu;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitmqUtils {
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/virHost1");
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
}
//定义提供连接对象的方法
public static Connection getConnection() {
try{
return connectionFactory.newConnection();
}catch (Exception e) {
e.printStackTrace();
}
return null;
}
//关闭通道和关闭连接
public static void closeConnectionCannel(Channel channel, Connection connection) {
try {
if (channel!=null) channel.close();
if (connection!=null) connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 开发生产者
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
for(int i=0; i<100; i++) {
channel.basicPublish("", "work", null, ("workMessage"+i).getBytes());
}
RabbitmqUtils.closeConnectionCannel(channel, connection);
- 开发两个消费者
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+ new String(body));
}
});
- 执行结果:如图示消费者之间采用轮询方式消费,
1.4.2.2 消息确定机制和能者多劳实现
- 修改上述中消费者1将其降速
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+ new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
- 执行结果如图所示:默认情况下RabbitMQ每个消费者会收到相同数量的消息,消费慢的会慢慢消费
- 消息自动确认机制:
channel.basicConsume方法中自动确认autoAck=true时,不关心业务到底是否处理完(宕机即丢失),拿到即告诉队列消费完,队列即删出这些队列,
Connection connection = RabbitmqUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1); //通道每次只能消费一个消息
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", false, new DefaultConsumer(channel){ //消费消息时关闭自动确认
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+ new String(body));
channel.basicAck(envelope.getDeliveryTag(), false); //argument1:手动确认消息标识,argument2:false为每次确认一个
}
});