rabbitmq-workqueue模型(1对多模型)

helloworld项目是直连模型,(1对1,1个生产者和1个消费者)

durable参数说明:

是否持久化队列,重启rabbitmq后,durable=false会丢失队列;druable=true,重启rabbitmq,消息会丢失
消费者的绑定参数与生产者要一致。都为druable=true,autoDelete=false,exclusive=false,arguments=null,queue。

 

workqueue模型,(1对多):

RabbitMQUtils.java

package com.gavin.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtils {
    private static ConnectionFactory connectionFactory;
    static{
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
    }

    public static Connection getConnection() throws IOException, TimeoutException {
       return connectionFactory.newConnection();
    }
    public static void close(Channel channel,Connection connection) throws IOException, TimeoutException {
        if (channel != null) channel.close();
        if(connection != null) connection.close();
    }
}

 

生产者:

package com.gavin.mq.workqueue;

import com.gavin.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作队列模型:让多个消费绑定到一个队列,共同消费队列中的消息
 * 多个消费者平均分配
 */
public class WorkProvider {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {

        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for(int i = 0;i<10;i++){
            channel.basicPublish("","work", MessageProperties.PERSISTENT_BASIC,(i+"hello work queue").getBytes());
        }
        RabbitMQUtils.close(channel,connection);
    }
}

  消费者:

package com.gavin.mq.workqueue;

import com.gavin.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
    }
}

  消费者2:

package com.gavin.mq.workqueue;

import com.gavin.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkConsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2:"+new String(body));
            }
        });
    }
}

  

 

上一篇:rabbitmq-direct路由订阅模型


下一篇:Netty 框架学习 —— 传输