一、什么是优先级队列
在服务级级别的测试中需要考虑被执行任务的优先级机制,也就是通过线程优先级来进行,设置优先级的目的
是在资源非常紧张的情况下,让优先级高的任务优先执行,而优先级低的任务排后执行,当然这样的一种设置机制
只能是异步的模式下执行,如果是设计在同步的模式下执行,那这个设计从系统上来说就缺少宏观维度的思考。在
RabbitMQ的机制中也是提供了队列的优先级机制,这样设计的目的也是在在生产者生产过快,而消费者消费不过
来的情况下,也就是资源在紧张或者说是在有限的情况下,设置的队列优先级高的任务它的消息优先进行消费,而
优先级低的消息排后消费。当然,如果是在资源不紧张的情况下,设置优先级其实没多大的意义,因为这个时候优
先过来的消息先进行消费,也谈不上排队的机制和优先级的机制。
二、优先级的实现机制
针对优先级的设置,在消费者端进行设置,参数具体是x-max-priority,涉及的代码具体如下:
//设置优先级
Map<String,Object> arguments =new HashMap<String,Object>();
arguments.put("x-max-prioroty",10);
channel.queueDeclare(queueName,true,false,false,arguments);
这样消费者的代码执行后,在RabbitMQ的WEB控制台,就可以看到该消息队列显示设置的优先级,具体如下所
示:
如上,我们演示了配置一个队列的最大优先级,其实核心的是需要在生产者发送消息的时候设置当前发送任务的优先级
涉及代码如下:
AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF_8")
.headers(headers)
.priority(8)
.build();
String msg = "Hello RabbitMQ priority Message"+i;
channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
在如上中,我们设置的发送任务的优先级是8。
三、优先级队列实战代码
3.1、生产者代码
package com.example.rabbitmq.priority;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class ProducerPriority
{
private static final String exchangeName="test_priority_exchange";
private static final String routyKey="priority.test";
public static void main(String[] args) throws Exception
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.**.***.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
for(int i=0;i<3;i++)
{
Map<String,Object> headers=new HashMap<String,Object>();
headers.put("num",i);
if (i==0)
{
AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF_8")
.headers(headers)
.priority(8)
.build();
String msg = "Hello RabbitMQ priority Message"+i;
channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
}
else if (i==1)
{
AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF_8")
.headers(headers)
.priority(3)
.build();
String msg = "Hello RabbitMQ priority Message"+i;
channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
}
else
{
AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF_8")
.headers(headers)
.priority(9)
.build();
String msg = "Hello RabbitMQ priority Message"+i;
channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
}
}
}
}
在如上中,我们针对发送的任务依据编号进行了优先级的设置。
3.2、消费者代码
package com.example.rabbitmq.priority;
import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class ConsumerPriority
{
private static final String exchangeName = "test_priority_exchange";
private static final String queueName="test_priority_queue";
private static final String routingKey="priority.#";
public static void main(String[] args) throws Exception
{
try{
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("101.**.***.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
Connection connection=connectionFactory.newConnection();
Channel channel=connection.createChannel();
//设置优先级
Map<String,Object> arguments =new HashMap<String,Object>();
arguments.put("x-max-prioroty",10);
channel.exchangeDeclare(exchangeName,"topic",true,false,null);
channel.queueDeclare(queueName,true,false,false,arguments);
channel.queueBind(queueName,exchangeName,routingKey);
channel.basicConsume(queueName,true,new MyConsumer(channel=channel));
}catch (Exception e){
e.printStackTrace();
}
}
}
3.3、自定义消费者代码
package com.example.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer
{
//开启限流,所以需要设置channel
private Channel channel;
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
public MyConsumer(Channel channel)
{
super(channel);
this.channel=channel;
}
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
System.err.println("---------------consumer---------------\n");
System.err.println("the message received:"+new String(body));
System.err.println("message priority:"+properties.getPriority());
}
}
3.4、执行结果信息
如上的代码执行后,当然当前的资源不存在紧张的情况,那么就会按正常的顺序消费,具体输出结果如下:
如上,主要总结了消息队列优先级这部分的总结和它的案例应用。感谢您的阅读!