rabbitmq消息模式

1.简单模式:

P : 生产者

Queue(hello) : 队列

C : 消费者

rabbitmq消息模式

 

步骤:

1.创建工程

2.分别添加RabbitMQ依赖

3.编写生产者发送消息

4.编写消费者获取消息

pom.xml

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.10.0</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

 producer(生产者):

public class Producer_helloworld {
    private static final String QUEUE_NAME="queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1获取连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2设置参数
        factory.setHost("127.0.0.1");//主机号
        factory.setPort(5672);//端口号
        factory.setVirtualHost("/test");//队列虚拟机
        factory.setUsername("test");//用户名
        factory.setPassword("test");//密码
        //3创建连接Connection
        Connection connection = factory.newConnection();
        //4创建Channel
        Channel channel = connection.createChannel();
        //5创建队列
        /*
            queueDeclare(
                String queue,       队列名称
                boolean durable,    是否持久化到本地
                boolean exclusive,  (1)是否独占连接,只能有一个消费者监听这个队列(2)当connection关闭时,是否删除队列
                boolean autoDelete, 是否自动删除,没有消费之自动删除
                Map<String, Object> arguments) 配置参数
        * */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        /*
        basicPublish(
            String exchange,        交换机名称,简单模式使用默认交换机,空字符串即为默认交换机
            String routingKey,      路由名称
            BasicProperties props,  配置信息
            byte[] body             发送的消息数据
        )
        */
        String body="傻逼java面试官";
        //6发送消息
        channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
        //7释放资源
       /* if(channel != null){
            channel.close();
        }
        if(connection != null){
            connection.close();
        }*/
    }

}

consumer(消费者):

public class Hello01 {
    private static final String QUEUE_NAME="queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1获取连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2设置参数
        factory.setHost("127.0.0.1");//主机号
        factory.setPort(5672);//端口号
        factory.setVirtualHost("/test");//队列虚拟机
        factory.setUsername("test");//用户名
        factory.setPassword("test");//密码
        //3创建连接Connection
        Connection connection = factory.newConnection();
        //4创建Channel
        Channel channel = connection.createChannel();
        //5创建队列
        /*
            queueDeclare(
                String queue,       队列名称
                boolean durable,    是否持久化到本地
                boolean exclusive,  (1)是否独占连接,只能有一个消费者监听这个队列(2)当connection关闭时,是否删除队列
                boolean autoDelete, 是否自动删除,没有消费之自动删除
                Map<String, Object> arguments) 配置参数
        * */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        /*
        basicConsume(
            String queue,       队列名
            boolean autoAck,    是否自动确认
            Consumer callback   回调函数
        )
        */
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.work queues工作队列模式:

p: 生产者

queue': 队列

C1: 消费者1

C2: 消费者2

rabbitmq消息模式

 

work queues:与入门的简单模式相比,多了一个消费者,多个消费者共同消费同一个队列,消费者之间为竞争关系

应用场景:对于任务过重或任务较多的工作队列可以提高任务处理的速度.

producer:

public class Producer_helloworld {
    private static final String QUEUE_NAME="queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1获取连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2设置参数
        factory.setHost("127.0.0.1");//主机号
        factory.setPort(5672);//端口号
        factory.setVirtualHost("/test");//队列虚拟机
        factory.setUsername("test");//用户名
        factory.setPassword("test");//密码
        //3创建连接Connection
        Connection connection = factory.newConnection();
        //4创建Channel
        Channel channel = connection.createChannel();
        //5创建队列
        /*
            queueDeclare(
                String queue,       队列名称
                boolean durable,    是否持久化到本地
                boolean exclusive,  (1)是否独占连接,只能有一个消费者监听这个队列(2)当connection关闭时,是否删除队列
                boolean autoDelete, 是否自动删除,没有消费之自动删除
                Map<String, Object> arguments) 配置参数
        * */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        /*
        basicPublish(
            String exchange,        交换机名称,简单模式使用默认交换机,空字符串即为默认交换机
            String routingKey,      路由名称
            BasicProperties props,  配置信息
            byte[] body             发送的消息数据
        )
        */
        String body="傻逼java面试官";
        //6发送消息
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
        }
        //7释放资源
        channel.close();
        connection.close();
    }
}

consumer1/consumer2:

public class Hello01  {/// Hello02

    private static final String QUEUE_NAME="queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        //1获取连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2设置参数
        factory.setHost("127.0.0.1");//主机号
        factory.setPort(5672);//端口号
        factory.setVirtualHost("/test");//队列虚拟机
        factory.setUsername("test");//用户名
        factory.setPassword("test");//密码
        //3创建连接Connection
        Connection connection = factory.newConnection();
        //4创建Channel
        Channel channel = connection.createChannel();
        //5创建队列
        /*
            queueDeclare(
                String queue,       队列名称
                boolean durable,    是否持久化到本地
                boolean exclusive,  (1)是否独占连接,只能有一个消费者监听这个队列(2)当connection关闭时,是否删除队列
                boolean autoDelete, 是否自动删除,没有消费之自动删除
                Map<String, Object> arguments) 配置参数
        * */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        /*
        basicConsume(
            String queue,       队列名
            boolean autoAck,    是否自动确认
            Consumer callback   回调函数
        )
        */
        Consumer consumer = new DefaultConsumer(channel){
            int number=0;
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,                         AMQP.BasicProperties properties, byte[] body) throws IOException {
                number++;
                System.out.println(number+(new String(body)));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

 

运行过程及结果:

首先开启两个consumer消费者,此时该队列无消息,启动完成后再启动producer生产者发送消息,

此时两个消费者竞争获取消息

3.Pub/Sub订阅模式

p(producer): 生产者

x(exchange): 交换机,接受生产者发送的消息,知道如何处理消息,例如交给某个特定的消息队列,递交给所有队列......

exchange三种模式:

        Fanout: 广播,将消息发送到所有绑定交换机的队列

        Direct: 定向,把消息发哦是那个到指定的routing key的队列

        Topic: 通配符,把消息交给符合routing pattern(路由模式)的队列

queue: 消息对列

c1/c2(consumer): 消费者

rabbitmq消息模式

 

producer(生产者):

public class Producer_PubSub {
    private static final String QUEUE_NAME1="pubsub_queue1";
    private static final String QUEUE_NAME2="pubsub_queue2";
    private static final String EXCHANGE_NAME="fanout_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        //获取新连接
        Connection connection = factory.newConnection();
        //创建channel通道
        Channel channel = connection.createChannel();
        //创建Exchange交换机
        /*
            exchangeDeclare(
                String exchange,            交换机名称
                BuiltinExchangeType type,   交换机类型
                    type:
                        DIRECT("direct"),   定向
                        FANOUT("fanout"),   广播,发送到每一与之绑定的队列
                        TOPIC("topic"),     通配符模式
                        HEADERS("headers"); 参数匹配
                boolean durable,            是否持久化
                boolean autoDelete,         是否自动删除
                boolean internal,           内部(erlang语言内部开发),一般置为false
                Map<String, Object> arguments)参数.置为null
        * */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, false, null);
        //创建队列
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
        /*
            queueBind(
                String queue,       队列名称
                String exchange,    交换机名称
                String routingKey   路由键,绑定队则
                    如果交换机类型是fanout,那么routingKey设置为"",发送到所有绑定的队列
            )
        * */
        //队列绑定交换机
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
        String body="日志:张三调用了findAll方法,日志级别:info....";
        //发送消息
        channel.basicPublish(EXCHANGE_NAME,"",null,body.getBytes());
        //释放连接
        channel.close();
        connection.close();
    }
}

consumer01/consumer02:

public class consumer01 {
    private static final String QUEUE_NAME1="pubsub_queue1";//private static final String QUEUE_NAME2="pubsub_queue2";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        Connection connection = factory.newConnection();

        //获取channel
        Channel channel = connection.createChannel();
        //获取消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.
                           BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME1,true,consumer);//channel.basicConsume(QUEUE_NAME2,true,consumer);

    }
}

4.Routing模式(路由模式)

p(produmer): 生产者

x(exchange): 交换机

        type=direct 交换机类型

Q1/Q2(queue): 队列

C1/C2(consumer): 消费者

rabbitmq消息模式

 

producer(生产者):

public class RoutingProducer {
    private static final String QUEUE_ROUTING1 = "queue_routing1";
    private static final String QUEUE_ROUTING2 = "queue_routing2";
    private static final String EXCHANGE_NAME = "exchange_Routing";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("test");
        factory.setVirtualHost("/test");
        factory.setPassword("test");
        Connection connection = factory.newConnection();
        //获取连接connection
        Channel channel = connection.createChannel();
        //创建队列queue
        channel.queueDeclare(QUEUE_ROUTING1,true,false,false,null);
        channel.queueDeclare(QUEUE_ROUTING2,true,false,false,null);
        //创建交换机exchange
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true,false,false,null);
        //消息主体
        String err_body="error:  数据插入错误error...";
        String info_body="info: 初始化.....";
        String warning_body="warning: 加载中....";
        //队列与交换机绑定,queueBind方法的第三个参数为routing key,指定路由模式
        channel.queueBind(QUEUE_ROUTING1,EXCHANGE_NAME,"error",null);
        channel.queueBind(QUEUE_ROUTING2,EXCHANGE_NAME,"info",null);
        channel.queueBind(QUEUE_ROUTING2,EXCHANGE_NAME,"warning",null);
        channel.queueBind(QUEUE_ROUTING2,EXCHANGE_NAME,"error",null);
        //发送消息
        channel.basicPublish(EXCHANGE_NAME,"info",null,info_body.getBytes());
        channel.basicPublish(EXCHANGE_NAME,"error",null,err_body.getBytes());
        channel.basicPublish(EXCHANGE_NAME,"warning",null,warning_body.getBytes());
        //关闭资源
        channel.close();
        connection.close();
    }
}

consumer01(消费者):

public class Consumer_Routing_Err {
    private static final String QUEUE_ROUTING1 = "queue_routing1";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(QUEUE_ROUTING1,true,consumer);
    }
}

consumer02(消费者):

public class Consumer_Routing_Err {
    private static final String QUEUE_ROUTING1 = "queue_routing1";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(QUEUE_ROUTING1,true,consumer);
    }
}

5.Topics通配符模式

p(producer): 生产者

X(exchange): 交换机

        type: topics

Q1/Q2(queue): 队列

C1/C2(consumer): 消费者

        topics中:

                *通配符: 一个单词

                #通配符: 一个或多个单词

rabbitmq消息模式

 

producer:

public class Producer_Topics {
    private static final String QUEUE1="Queue_Topics1";
    private static final String QUEUE2="Queue_Topics2";
    private static final String EXCHANGE="exchange_topics";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        //获取连接
        Connection connection = factory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(QUEUE1,true,false,false,null);
        channel.queueDeclare(QUEUE2,true,false,false,null);
        //创建交换机
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
        //交换机绑定队列
        channel.queueBind(QUEUE1,EXCHANGE,"*.orange.*");
        channel.queueBind(QUEUE2,EXCHANGE,"*.*.rabbite");
        channel.queueBind(QUEUE2,EXCHANGE,"lazy.#");
        //发送消息
        String orange_body="orange_body: orange_body.....";
        String rabbite_body="rabbite:   rabbite.....";
        String lazy_body="lazy:     lazy......";
        channel.basicPublish(EXCHANGE,"xxx.orange.xxx",null,orange_body.getBytes());
        channel.basicPublish(EXCHANGE,"xxx.xxx.rabbite",null,rabbite_body.getBytes());
        channel.basicPublish(EXCHANGE,"lazy.xxx",null,lazy_body.getBytes());
        //关闭资源
        channel.close();
        connection.close();
    }
}

consumer01:

public class Consumer01 {
    private static final String QUEUE1="Queue_Topics1";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        //获取连接
        Connection connection = factory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //创建consumer
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        //获取消息
        channel.basicConsume(QUEUE1,true,consumer);
    }
}

consumer02:

 public class Consumer02 {
    private static final String QUEUE2="Queue_Topics2";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        //获取连接
        Connection connection = factory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //创建consumer
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        //获取消息
        channel.basicConsume(QUEUE2,true,consumer);
    }
}

上一篇:springBoot定制内嵌的Tomcat


下一篇:23种设计模式(1)---简单工厂模式