springboot-rabbitmq

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

springboot-rabbitmq


使用步骤

1.引入依赖

核心依赖(示例):

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.配置yml

代码如下(示例):

server:
  port: 9001
spring:
  rabbitmq:
    host: 192.168.249.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /

3.声明交换机,队列,并绑定关系

第一种:通过config(示例):

@Configuration
public class RabbitmqConfig {
    //1.声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout-order-exchange",true,false);
    }
    //2.声明队列 sms.fanout.queue email.fanout.queue duanxin.fanout.queue
    @Bean
    public Queue smsQueue(){
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue emailQueue(){
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue duanxinQueue(){
        return new Queue("duanxin.fanout.queue", true);
    }
    //3.完成绑定关系
    @Bean
    public Binding  smsBinding(){
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding  emailBinding(){
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding  duanxinBinding(){
        return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
    }
}

第二种:通过注解绑定关系
注意:这里只是绑定关系,并没有声明交换机,队列(推荐使用config)

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "email.fanout.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "fanout-order-exchange",type = ExchangeTypes.FANOUT),
        key = ""
))
@Service
public class FanoutEmailConsumer {
    @RabbitHandler
    public void getMessage(String message){
        System.out.println("email fanout----接收到的订单信息是:"+message);
    }
}

4.业务类(provider=“发送消息”)

代码如下(示例):

@Service
public class OrderService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void makeOrder(){
        //1.根据商品id查询库存是否充足
        //2.保存订单
        String orderId=UUID.randomUUID().toString();
        System.out.println("订单生成成功:"+orderId);
        //3.通过mq来完成消息的分发
        //参数1.交换机 参数2.路由key/队列名 参数3.消息内容
        String exchangeName = "fanout-order-exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
    }
}

5.业务类(consumer=“消费消息”)

代码如下(示例):

@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class FanoutEmailConsumer {
    @RabbitHandler
    public void getMessage(String message){
        System.out.println("email fanout----接收到的订单信息是:"+message);
    }
}

高级特性

1.TTL(设置过期时间)

x-message-ttl(示例):

	@Bean
    public Queue emailQueue(){
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl", 5000);//这里一定是一个int类型
        return new Queue("email.fanout.queue", true,false,false,args);
    }

设置单条消息过期时间(示例):

@Service
public class OrderService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void makeOrder(){
        //1.根据商品id查询库存是否充足
        //2.保存订单
        String orderId=UUID.randomUUID().toString();
        System.out.println("订单生成成功:"+orderId);
        //3.通过mq来完成消息的分发
        //参数1.交换机 参数2.路由key/队列名 参数3.消息内容
        String exchangeName = "fanout-order-exchange";
        String routingKey = "";
        MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor);
    }
}

TTL队列消息过期会被转移到死信队列中
而单独设置的过期消息则会被直接移除

2.DLX(死信队列)

x-dead-letter-exchange(死信交换机)
x-dead-letter-routing-key(死信routingkey)(示例):

    @Bean
    public Queue emailQueue(){
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl", 5000);//这里一定是一个int类型
        args.put("x-dead-letter-exchange", "dead-direct-exchange");//绑定死信交换机
        args.put("x-dead-letter-routing-key", "dead");//绑定死信交换机路由key(也就是DLK)
        return new Queue("email.fanout.queue", true,false,false,args);
    }

3.LIM(最大队列长度)

x-max-length(示例):

    @Bean
    public Queue emailQueue(){
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl", 5000);//这里一定是一个int类型
        args.put("x-max-length", 5);
        args.put("x-dead-letter-exchange", "dead-direct-exchange");//绑定死信交换机
        args.put("x-dead-letter-routing-key", "dead");//绑定死信交换机路由key(也就是DLK)
        return new Queue("email.fanout.queue", true,false,false,args);
    }

死信交换机其实也是一个普通的交换机,类型可以按需设置
当当前交换机消息过期时,会被转入死信交换机,并发送给死信队列

单机集群搭建

搭建集群(示例):

#启动第一个节点rabbit-1
sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
#启动第一个节点rabbit-2
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
#停止应用
sudo rabbitmqctl -n rabbit-1 stop_app
#目的是清楚节点上的历史数据(如果不清除,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-1 reset
#启动应用
sudo rabbitmqctl -n rabbit-1 start_app
#重复步骤
sudo rabbitmqctl -n rabbit-2 stop_app
sudo rabbitmqctl -n rabbit-2 reset
#将rabbit-2作为从节点连接rabbit-1	@后面为服务器的主机名[root@localhost ~]
sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@localhost
#启动rabbit-2
sudo rabbitmqctl -n rabbit-2 start_app
#查看集群信息
sudo rabbitmqctl cluster_status -n rabbit-1

开启web监控(示例):

#开启web查件
rabbitmq-plugins enable rabbitmq_management
#创建用户
rabbitmqctl -n rabbit-1 add_user admin admin
#赋予身份
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
#赋予权限
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
#重复rabbit-2(这里显示已经存在,可能集群user共通的吧)
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"

总结

无论发送消息还收消费消息,都需要连接rabbitmq
声明交换机,队列,绑定关系,在provider或者consumer都是可以的
这里的示例是fanout模式 FanoutExchange
direct模式则是 DirectExchange(绑定队列得加上路由key)
示例: BindingBuilder.bind(duanxinQueue()).to(directExchange()).with(“duanxin”);

上一篇:springboot读取静态文件路径顺序


下一篇:2021-06-08 SpringBoot2核心技术 web场景(一)