RabbitMq

1、界面管理

2、代码实现点对点的消息发送与消费(也就是一个生产者对应一个消费者。)

/**
 * @author Suny
 * @data 2021/11/1 20:36
 * @description 生产者
 */
public class ProducerDemo01 {
    public static void main(String[] args) throws Exception {
        //创建mq的链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //配置rabbitmq的主机地址
        factory.setHost("127.0.0.1");
        //配置端口号
        factory.setPort(5672);
        //配置虚拟主机
        factory.setVirtualHost("/ems");
        //配置账户和密码
        factory.setUsername("ems");
        factory.setPassword("ems");
        //获取链接
        Connection connection = factory.newConnection();

        //通过链接获取链接中的通道
        Channel channel = connection.createChannel();

        //参数 队列名、队列是否要持久化、队列是否只允许当前通道使用、队列在消费完之后是否自动删除、额外附加参数
        channel.queueDeclare("Hello",false,false,false,null);

        //发布消息
        //参数 交换机名称 、队列名称、传递消息的额外设置、消息具体内容
        channel.basicPublish("","Hello",null,"Hello,World".getBytes());

        channel.close();
        connection.close();
    }
}



/**
 * @author Suny
 * @data 2021/11/1 20:50
 * @description 消费者
 */
public class ConsumerDemo01 {
    public static void main(String[] args) throws Exception {
        //创建mq的链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //配置rabbitmq的主机地址
        factory.setHost("127.0.0.1");
        //配置端口号
        factory.setPort(5672);
        //配置虚拟主机
        factory.setVirtualHost("/ems");
        //配置账户和密码
        factory.setUsername("ems");
        factory.setPassword("ems");
        //获取链接
        Connection connection = factory.newConnection();

        //通过链接获取链接中的通道
        Channel channel = connection.createChannel();

        //通道绑定队列
        //参数 队列名、队列是否要持久化、队列是否只允许当前通道使用、队列在消费完之后是否自动删除、额外附加参数
        channel.queueDeclare("Hello",false,false,false,null);

        //消费消息
        //参数 队列名称、是否开启消息自动确认机制
        channel.basicConsume("Hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body);
                System.out.println("str = " + str);
            }
        });

        /*channel.close();
        connection.close();*/
    }
}

3、工作模式一个生产多个消费(一个生产者对应多个消费者,但是只能有一个消费者获得消息,也称之为竞争消费者模式。

/**
 * @author Suny
 * @data 2021/11/2 20:29
 * @description 工作模式
 */
public class WorkProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("workQueue",true,false,false,null);
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","workQueue",MessageProperties.PERSISTENT_TEXT_PLAIN,(i +"Hello Work Queue").getBytes());
        }
        RabbitMqConfig.closeConnection(channel,connection);
    }
}

// 消费者
/**
 * @author Suny
 * @data 2021/11/2 20:34
 * @description 工作模式的消费者
 */
public class WorkConsumer {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMqConfig.getConnection();
        final Channel channel = connection.createChannel();
        //每次只消费、确认一条消息
        channel.basicQos(1);
        //参数 队列名、队列是否要持久化、队列是否只允许当前通道使用、队列在消费完之后是否自动删除、额外附加参数
        channel.queueDeclare("workQueue",true,false,false,null);
        //参数 队列名称、是否开启消息自动确认机制
        channel.basicConsume("workQueue",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 - :" + new String(body));
                //当消息消费完之后开启手动确认机制
                //参数: 确认队列中那个具体的消息,是否开启多个消息同事确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

4、发布订阅(一个消费者将消息首先发送到交换器,交换器绑定多个队列,然后被监听该队列的消费者所接收并消费。)

/**
 * @author Suny
 * @data 2021/11/8 21:04
 * @description 广播模式
 */
public class FanoutProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //将通道声明指定的交换机
        //参数 交换机名称、交换机的模式fanout为广播
        channel.exchangeDeclare("logs","fanout");
        channel.basicPublish("logs","",null,"fanout".getBytes());
        channel.close();
        connection.close();
    }
}


/**
 * @author Suny
 * @data 2021/11/8 21:04
 * @description 广播消费者,每一个消费者都会消费消息
 */
public class FanoutConsumer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列绑定交换机
        //参数 队列名 交换机名 路由key
        channel.queueBind(queueName,"logs","");
        //消费消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer1-fanout消费了");
            }
        });
    }
}

/**
 * @author Suny
 * @data 2021/11/8 21:04
 * @description 广播消费者
 */
public class FanoutConsumer2 {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列绑定交换机
        //参数 队列名 交换机名 路由key
        channel.queueBind(queueName,"logs","");
        //消费消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer2-fanout消费了");
            }
        });
    }
}

5、路由模式(生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的对列,接着监听该队列的消费者消费消息。)

/**
 * @author Suny
 * @data 2021/11/8 21:28
 * @description 路由模式
 */
public class DirectProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //将通道声明指定的交换机
        //参数 交换机名称、交换机的模式DIRECT为广播
        channel.exchangeDeclare("logs_Direct", "direct");
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0){
                channel.basicPublish("logs_Direct","aaa",null,"DIRECT".getBytes());
            }else {
                channel.basicPublish("logs_Direct","bbb",null,"DIRECT".getBytes());
            }
        }
        channel.close();
        connection.close();
    }
}


/**
 * @author Suny
 * @data 2021/11/8 21:28
 * @description 路由模式
 */
public class DirectConsumer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机 交换机名称、交换机模式DIRECT(路由)
        channel.exchangeDeclare("logs_Direct",BuiltinExchangeType.DIRECT);
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列绑定交换机
        //参数 队列名 交换机名 路由key
        channel.queueBind(queueName,"logs_Direct","aaa");
        //消费消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String string = new String(body);
                System.out.println(string);
            }
        });
    }
}
/**
 * @author Suny
 * @data 2021/11/8 21:28
 * @description 路由模式
 */
public class DirectConsumer2 {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机 交换机名称、交换机模式DIRECT(路由)
        channel.exchangeDeclare("logs_Direct","direct");
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列绑定交换机
        //参数 队列名 交换机名 路由key
        channel.queueBind(queueName,"logs_Direct","bbb");
        //消费消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer1-fanout消费了");
            }
        });
    }
}

6、动态路由

/**
 * @author Suny
 * @data 2021/11/8 21:28
 * @description 动态路由模式、主题模式支持统配符号比如*
 */
public class TopicProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //交换机名、交换机类型
        channel.exchangeDeclare("Topic", BuiltinExchangeType.TOPIC);
        String RoutingKey = "user.info";
        channel.basicPublish("Topic",RoutingKey,null,"Topic交换机".getBytes());
        RabbitMqConfig.closeConnection(channel,connection);
    }
}

/**
 * @author Suny
 * @data 2021/11/8 21:28
 * @description 动态路由
 */
public class TopicConsumer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("Topic", BuiltinExchangeType.TOPIC);
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //参数 队列名 交换机名 路由key
        //通配符*表示只匹配一个、#表示匹配多个
        channel.queueBind(queueName,"Topic","*.info");
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body);
                System.out.println(str);
            }
        });
    }
}

spring boot集成Rabbitmq

// 相关jar包
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        
        //相关配置文件
        # rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=ems
spring.rabbitmq.password=ems
spring.rabbitmq.virtual-host=/ems
# 手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 一次消费一条消息
spring.rabbitmq.listener.simple.prefetch= 1

spring boot生产者

package com.itheima.demo;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @author Suny
 * @data 2021/11/9 20:31
 * @description Springboot集成rabbitmq
 */
@SpringBootTest
public class RabbitmqTest {

    //注入rabbitmq的使用模板
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void  testHello(){
        //点对点模式
        //队列名和消息
        rabbitTemplate.convertAndSend("Boot","HelloRabbitmq");
    }

    @Test
    public void testWork(){
        //工作模式
        //队列名和消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work2","HelloWork");
        }
    }

    @Test
    public void testFanout(){
        //广播模式
        //交换机名、路由、消息
        rabbitTemplate.convertAndSend("FanoutExchange","","FanoutExchange");
    }

    @Test
    public void testRoutingKey(){
        //路由模式
        //交换机名、路由、消息
        rabbitTemplate.convertAndSend("RoutingKey2021","userInfo","RoutingKey2021");
    }

    @Test
    public void testTopic(){
        //动态路由模式
        //交换机名、路由、消息
        rabbitTemplate.convertAndSend("testTopic","user.Info","testTopic");
    }
}

spring boot消费者

/**
 * @author Suny
 * @data 2021/11/9 20:37
 * @description rabbitmq消费者
 * 点对点消费
 */
@Component
//当不配置持久化和自动删除时默认的是 不排他和非持久化的队列
@RabbitListener(queuesToDeclare = @Queue(value = "Boot",durable = "false",autoDelete = "false"))
public class Consumer {
    @RabbitHandler
    //@RabbitHandler使用此注解接收@RabbitListener指定的队列
    public void receive1(String message){
        System.out.println("message = " + message);
    }
}

/**
 * @author Suny
 * @data 2021/11/9 20:37
 * @description Springboot中使用work模式、并使用手动ACK进行消息确认
 */
@Component
public class WorkConsumer {

    @RabbitListener(queuesToDeclare = @Queue("work2"))
    public void receive1(String msg, Message message, Channel channel){
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("message1 = " + msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @RabbitListener(queuesToDeclare = @Queue("work2"))
    public void receive2(String msg, Message message, Channel channel){
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("message2 = " + msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


/**
 * @author Suny
 * @data 2021/11/9 20:56
 * @description 广播模式的消费
 */
@Component
public class FanoutConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue("fanout2021"),
                    exchange = @Exchange(value = "FanoutExchange",type = "fanout")
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue("fanout2022"),
                    exchange = @Exchange(value = "FanoutExchange",type = "fanout")
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}


/**
 * @author Suny
 * @data 2021/11/9 20:56
 * @description 路由模式
 */
@Component
public class RoutingConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue,
                    exchange = @Exchange(value = "RoutingKey2021",type = "direct"),
                    key = {"userInfo"}
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue,
                    exchange = @Exchange(value = "RoutingKey2021",type = "direct"),
                    key = {"userInfo"}
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}


/**
 * @author Suny
 * @data 2021/11/9 20:56
 * @description动态路由模式
 */
@Component
public class TopicConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue,
                    exchange = @Exchange(value = "testTopic",type = "topic"),
                    key = {"user.*"}
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue,
                    exchange = @Exchange(value = "testTopic",type = "topic"),
                    key = {"user.*"}
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}
上一篇:gitlab的CI/CD实现


下一篇:第十八届“华为杯”数模LaTeX模板送你!附官方优秀论文下载