RabbitMQ(二):Java 操作队列

1. 简单模式

模型:

RabbitMQ(二):Java 操作队列

  • P:消息的生产者
  • 队列:rabbitmq
  • C:消息的消费者

获取 MQ 连接

public static Connection getConnection() throws IOException, TimeoutException {
// 定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost("127.0.0.1");
// AMQP 5672
factory.setPort(5672);
// vhost
factory.setVirtualHost("/vhost_ljf");
// 用户名
factory.setUsername("ljf");
// 密码
factory.setPassword("123456");
return factory.newConnection();
}

生产者生产消息

public class Send {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello simple!";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("--send msg: " + msg);
channel.close();
connection.close();
}
}

消费者接收消息

public class Recv {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取到达的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv: " + msg);
}
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

简单队列的不足

耦合性高,生产者一一对应消费者(如果我想有多个消费者消费队列中的消息,这时候就不行了);

队列名变更,这时候得同时变更。

2. 工作队列模式(Work Queue)

模型

RabbitMQ(二):Java 操作队列

为什么会出现工作队列?

simple 队列是一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理,可能需要花费时间,这时候队列就会积压了很多消息。

生产者

/**
* |----C1
* P----Queue----|
* |----C2
*/
public class Send {
private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{
// 获取连接
Connection connection = ConnectionUtils.getConnection(); // 获取 channel
Channel channel = connection.createChannel(); // 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) {
String msg = "hello" + i;
System.out.println("[WQ] send: " + msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}

消费者

  • 消费者1
public class Recv1 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取 channel
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Recv msg: " + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done.");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
  • 消费者2
public class Recv2 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取 channel
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg: " + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done.");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

现象

先运行消费者1和消费者2,再运行生产者

消费者1 和 消费者2 处理的消息数量是一样多的。

消费者1:偶数

消费者2:奇数

这种方式叫做轮询分发(round-robin),结果就是不管谁忙谁清闲,都不会多给一个消息。

3. 公平分发(fair dipatch)

生产者

public class Send {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取 channel
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 每个消费者:发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
*/
int prefetchCount = 1;
channel.basicQos(prefetchCount);
for (int i = 0; i < 50; i++) {
String msg = "hello" + i;
System.out.println("[WQ] send: " + msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
Thread.sleep(i*5);
}
channel.close();
connection.close();
}
}

消费者

  • 消费者1
public class Recv1 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取 channel
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1); // 保证一次只发送一个
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Recv msg: " + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done.");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false; // 自动应答 false
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
  • 消费者2
public class Recv2 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取 channel
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1); // 保证一次只发送一个
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg: " + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done.");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false; // 自动应答 false
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

现象

消费者2 处理的消息比 消费者1 多,能者多劳。

4. 消息应答与消息持久化

消息应答

boolean autoAck = false; // 自动应答 false
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  • boolean autoAck = true;(自动确认模式)

一旦 rabbitmq 将消息分发给消费者,就会从内存中删除;

这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息。

  • boolean autoAck = false;(手动模式)

如果一个消费者挂掉,就会交付给其他消费者;

rabbitmq 支持消息应答,消费者发送一个消息应答,告诉 rabbitmq 这个消息我已经处理完成,可以删掉,然后 rabbitmq 就删除内存中的消息。

消息应答默认是打开的,即为 false;

如果 rabbitmq 挂了,消息任然会丢失。

消息持久化

// 声明队列
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

注意:rabbitmq 不允许重新定义(不同参数)一个已存在的队列

上一篇:【HDU1102】Constructing Roads(MST基础题)


下一篇:Raising Modulo Numbers_快速幂取模算法