SpringBoot集成RabbitMQ

如今较为流行的消息中间件,例如RabbitMQ和Kafka,在此介绍使用SpringBoot集成RabbitMQ。

1.引入SpringBoot集成的RabbitMQ的启动包依赖,因为SpringBoot对RabbitMQ有着很好的集成,所以在导入依赖时,这里我的SpringBoot使用的是最新版的:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2.配置配置文件application.yml,在此注意我们的ip地址和端口号就行:

server:
  port: 8021
spring:
  application:
    name: rabbitmq-provider

  //配置我们的rabbitmq的主机地址,本机Windows的话就用127.0.0.1,其他主机的话就用ip地址,这里我使用的是我的服务器ip地址
  rabbitmq:
    host: 101.35.105.175
    //配置我们的rabbitmq的端口号,开放的一般是5672
    port: 5672
    username: admin
    password: 123
    #虚拟host 可以不设置,使用server默认host
    virtual-host: JCcccHost

3.创建好我们的工程之后,创建生产者:这里我们对各个不同的rabbitmq的工作模式分别创建不同:

3.1、HelloWorld模式:
生产者:这里我们使用RabbitTemplate 的模板对象,我们只需对其进行注入就能使用RabbitTemplate 对象对消息进行操作。

@SpringBootTest(classes = SpringbootRabbitmqApplication.class)
@RunWith(SpringRunner.class)
public class HelloWorldDemo {
    @Autowired
    RabbitTemplate rt;

    @Test
    public void send() {
        rt.convertAndSend("hello-world","hello world!!!");
    }
}

消费者:
使用@RabbitListener(queuesToDeclare = @Queue(“hello-world”))去对我们的队列进行绑定。
这里的msg参数,是接收生产者产生的消息。

@Component
@RabbitListener(queuesToDeclare = @Queue("hello-world"))
public class HelloWorld {

    @RabbitHandler
    public void receive(String msg){
        System.out.println("msg:"+msg);
    }
}

执行后:
SpringBoot集成RabbitMQ
SpringBoot集成RabbitMQ
3.2、work模式:
生产者:

package com.example.springbootrabbitmq.helloworld;
/*
    PACKAGE_NAME:com.example.springbootrabbitmq.helloworld
    USER:18413
    DATE:2021/10/6 18:29
    PROJECT_NAME:RabbitMq
    面向代码面向君,不负代码不负卿————蒋明辉 */


import com.example.springbootrabbitmq.SpringbootRabbitmqApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = SpringbootRabbitmqApplication.class)
@RunWith(SpringRunner.class)
public class HelloWorldDemo {
    @Autowired
    RabbitTemplate rt;

    @Test
    public void send() {
        for (int i = 0; i < 100; i++) {
            rt.convertAndSend("hello-world","hello world!!!"+i);
        }
    }
}

消费者:多个消费者,绑定一个消息队列,轮询方式消费信息

package com.example.springbootrabbitmq.helloworld;
/*
    PACKAGE_NAME:com.example.springbootrabbitmq.helloworld
    USER:18413
    DATE:2021/10/6 17:05
    PROJECT_NAME:RabbitMq
    面向代码面向君,不负代码不负卿————蒋明辉 */

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HelloWorld {

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("hello-world"))
    public void receive01(String msg){
        System.out.println("receive01:"+msg);
    }
    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("hello-world"))
    public void receive02(String msg){
        System.out.println("receive02:"+msg);
    }
    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("hello-world"))
    public void receive03(String msg){
        System.out.println("receive03:"+msg);
    }
}

3.3、fanout模式——广播模式
生产者:
第一个参数:设置我们的交换机
第二个参数:设置routekey
第三个参数:消息体

@Test
    public void fanOutSend() {
        rt.convertAndSend("logs","","i am fanout");
    }

消费者:注解去绑定临时队列,绑定交换机,绑定消息类型

 @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    exchange = @Exchange(value = "logs",type = "fanout"))
    })
    public void receive02(String msg){
        System.out.println("receive02:"+msg);
    }
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    exchange = @Exchange(value = "logs",type = "fanout"))
    })
    public void receive03(String msg){
        System.out.println("receive03:"+msg);
    }

3.4、direct路由模式:绑定我们的路由key
生产者:

 @Test
    public void DirectSend() {
        rt.convertAndSend("route-directs","info","i am info");
        rt.convertAndSend("route-directs","error","i am error");
    }

消费者:

package com.example.springbootrabbitmq.direct;
/*
    PACKAGE_NAME:com.example.springbootrabbitmq.direct
    USER:18413
    DATE:2021/10/6 20:21
    PROJECT_NAME:RabbitMq
    面向代码面向君,不负代码不负卿————蒋明辉 */

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Direct {
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    key = {"info"},
                    exchange = @Exchange(value = "route-directs",type = "direct"))
    })
    public void receive01(String msg){
        System.out.println("receive01:"+msg);
    }
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    key = {"info","error"},
                    exchange = @Exchange(value = "route-directs",type = "direct"))
    })
    public void receive02(String msg){
        System.out.println("receive02:"+msg);
    }

}

3.5、Topic(动态路由):
生产者:

@Test
    public void TopicSend() {
        rt.convertAndSend("route-Topic","info","i am info");
        rt.convertAndSend("route-Topic","info.save","i am info");
        rt.convertAndSend("route-Topic","info.save.send","i am info");
        rt.convertAndSend("route-Topic","error.save","i am error");
        rt.convertAndSend("route-Topic","error.save.send","i am error");
    }

消费者:

package com.example.springbootrabbitmq.topic;
/*
    PACKAGE_NAME:com.example.springbootrabbitmq.topic
    USER:18413
    DATE:2021/10/6 21:48
    PROJECT_NAME:RabbitMq
    面向代码面向君,不负代码不负卿————蒋明辉 */

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Topic {
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    key = {"info.*","error.*"},
                    exchange = @Exchange(value = "route-topic",type = "topic"))
    })
    public void receive01(String msg){
        System.out.println("receive01:"+msg);
    }
}

4、各位老板可以点个免费的关注吗 谢谢啦!!!

上一篇:第二章 大话MQ消息中间件+JMS+AMQP核心知识


下一篇:RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合(springboot整合rabbitmq实战)