RabbitMQ02_简单模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics通配符模式、Work模式-轮询、公平(十)

⑥. Work模式 - 公平分发(Fair-Dispatch)


  • ①. 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配


  • ②. 生产者代码展示


/**
 * 轮询模式:公平的方式
 */
public class Producer {
    static final String QUEUE_NAME = "work_queue_fair";
    public static void main(String[] args) throws Exception {
        //2. 创建连接;
        Connection connection = ConnectionUtil.getConnection();
        //3. 创建频道;
        Channel channel = connection.createChannel();
        //4. 声明队列;
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
         * 参数3:是否独占本连接
         * 参数4:是否在不使用的时候队列自动删除
         * 参数5:其它参数
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        for(int  i=0;i<10;i++) {
            //5. 发送消息;
            String message = "你好!小兔纸work-----" + i;
            /**
             * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
             * 参数2:路由key,简单模式中可以使用队列名称
             * 参数3:消息其它属性
             * 参数4:消息内容
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("已发送消息:" + message);
        }
        //6. 关闭资源
        channel.close();
        connection.close();
    }
}


RabbitMQ02_简单模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics通配符模式、Work模式-轮询、公平(十)


③. 消费者代码


public class Consumer1 {
    public static void main(String[] args) throws Exception {
        //1. 创建连接工厂;
        //2. 创建连接;(抽取一个获取连接的工具类)
        Connection connection = ConnectionUtil.getConnection();
        //3. 创建频道;
        final Channel channel = connection.createChannel();
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
        System.out.println("fair的方式consumer1开始消费");
        //每次可以预期多少个消息
        channel.basicQos(1);
        //4. 创建消费者(接收消息并处理消息);
        //fair的方式一定要将应答方式改成手动应答
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //接收到的消息
                System.out.println("消费者1----接收到的消息为:" + new String(body, "utf-8"));
                try {
                    Thread.sleep(200);
                    //确认消失
                    /*
                     * 参数1:消息id
                     * 参数2:false表示只有当前这条被处理
                     * */
                    channel.basicAck(envelope.getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //6. 监听队列
        /**
         * 参数1:队列名
         * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
         * 如果设置为false则需要手动确认
         * 参数3:消费者
         */
        channel.basicConsume(Producer.QUEUE_NAME, false, defaultConsumer);
    }
}


RabbitMQ02_简单模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics通配符模式、Work模式-轮询、公平(十)



上一篇:quartz调度框架在web中应用实例


下一篇:阿里巴巴小程序繁星计划专题上线,汇集最优扶持资源与最新资讯!