rabbitMq完整通信(一)---producer

 

application.properties:

server.port=8080
spring.application.name=producer
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

最后是pom

 

先创建两个队列:

package com..direct;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//配置类,随系统启动时,创建两个队列, 用来接收发送过来的数据
@Configuration
public class DirectConf {
     @Bean
     public Queue queue() {
//          System.out.println("系统启动时:创建一个queue的队列到rabbitMQ");
          return new Queue("queue");
     }
     @Bean
     public Queue queueObject() {
//          System.out.println("系统启动时:创建一个queueObject的队列到rabbitMQ");
          return new Queue("queueObject");
     }
}

 

创建队列和交换器,并进行绑定:

package com..topic;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//配置类,随系统启动时,根据需求创建交换器和队列, 用来接收服务端发送过来的数据
@Configuration
public class TopicConf {

        //系统启动时:创建一个message的队列到rabbitMQ
        @Bean(name="message")
        public Queue queueMessage() {
            System.out.println("系统启动时:创建一个topic.order的队列到rabbitMQ");
            return new Queue("topic.order");
        }

        //系统启动时:创建一个exchange的交换器到rabbitMQ
        @Bean
        public TopicExchange exchange() {
            return new TopicExchange("exchange");
        }
        //系统启动时:将exchange的交换器与队列绑定
        @Bean
        Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
            System.out.println("系统启动时:将exchange的交换器与topic.order队列绑定");
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.order");
        }

}

 

定义队列发送的方法:

package com..sender;

import java.util.Map;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {
    //注入AmqpTemplate
    @Autowired
    private AmqpTemplate template;
    //由AmqpTemplate将数据发送到指定的队列
    public void send(String queueName,String orderId) {
        System.out.println("由AmqpTemplate将数据发送到指定的队列");
        template.convertAndSend(queueName, orderId);
    }
    //由AmqpTemplate将数据发送到指定的队列,主要用于发送对象
    public void sendObject(String queueName,Map user) {
        System.out.println("由AmqpTemplate将数据发送到指定的队列,主要用于发送对象");
        template.convertAndSend(queueName,user);
    }
    //由AmqpTemplate将数据发送到交换机和队列
    public void sendTopic(String exchange, String queueName, String orderId) {
        System.out.println(Thread.currentThread().getName()+":  进入sendTopic方法");
        System.out.println("%%%由AmqpTemplate将数据发送到交换机"+exchange+" 和队列 "+queueName);
        template.convertAndSend(exchange,queueName,orderId);
    }
}

 

RabbitListener监听服务端发送到队列的数据:

package com.wondersgroup.receive;

import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class OrderInfoReceive {
    //接收从topic.orderReceive队列的数据(主要存放了服务端订单查询的结果)
    @RabbitListener(queues="topic.orderReceive")    
    public void process1(String orderInfo) {    //用User作为参数
        System.out.println("监听%%%====topic.orderReceive  队列取到的  orderInfo :========:"+orderInfo);
    }    
}

 

POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <artifactId>product</artifactId>
    <packaging>jar</packaging>

   <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
       <version>1.5.21.RELEASE</version>
    </parent>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
            <scope>true</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-jasper</artifactId>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>
上一篇:RabbitMQ工作模式和代码实现备忘录


下一篇:从架构的角度看Kafka(四)