docker快速安装rabbitmq

阅读目录

 


回到顶部

一、获取镜像

#指定版本,该版本包含了web控制页面
docker pull rabbitmq:management

回到顶部

二、运行镜像

#方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

#方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

回到顶部

三、访问ui页面

http://localhost:15672/

docker快速安装rabbitmq

回到顶部

四、Spring boot连接rabbitmq案例

  1. 创建完后的pom.xml文件为
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xuan</groupId>
    <artifactId>springrabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springrabbitmq</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-freemarker</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

2.配置运行环境

# Tomcat
server:
    tomcat:
        uri-encoding: UTF-8
        max-threads: 1000
        min-spare-threads: 30
    port: 8070
    servlet:
        context-path: /rabbitmq

spring:
    servlet:
        multipart:
            max-file-size: 100MB
            max-request-size: 100MB
            enabled: true
    freemarker:
      suffix: .html
    rabbitmq:
      host: localhost
      port: 5672
      username: admin
      password: 123456
      virtual-host: /testmq
      listener:
        simple:
          #acknowledge-mode: manual #设置确认模式手工确认
          concurrency: 3 #消费者最小数量
          max-concurrency: 10 # 消费者最大数量

3.这点的rabbitmq配置也可以通过RabbitConfig.java类来配置:

package com.xuan.springrabbitmq.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//连接rabbitMQ的基本配置
@Configuration
@EnableRabbit
public class RabbitConfig {
         @Bean
        public ConnectionFactory connectionFactory() {
             CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
             connectionFactory.setUsername("admin");
             connectionFactory.setPassword("123456");
             connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/testmq");
             return connectionFactory;
        }

        @Bean
        public AmqpAdmin amqpAdmin() {
            return new RabbitAdmin(connectionFactory());
        }

        @Bean
        public RabbitTemplate rabbitTemplate() {
            return new RabbitTemplate(connectionFactory());
        }

        //配置消费者监听的容器
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory());
            factory.setConcurrentConsumers(3);
            factory.setMaxConcurrentConsumers(10);
            //factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
            return factory;
        }
}

4.配置路由和通道

package com.xuan.springrabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//生产者消费者模式的配置,包括一个队列和两个对应的消费者
@Configuration
public class ProducerConsumerConfig {

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myqueue");
        return queue;
    }

}

5.配置订阅发布模式PublishSubscribeConfig.java

package com.xuan.springrabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//发布订阅模式的配置,包括两个队列和对应的订阅者,发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机
@Configuration
public class PublishSubscribeConfig {

    @Bean
    public Queue myQueue1() {
        Queue queue = new Queue("queue1");
        return queue;
    }

    @Bean
    public Queue myQueue2() {
        Queue queue = new Queue("queue2");
        return queue;
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        FanoutExchange fanoutExchange = new FanoutExchange("fanout");
        return fanoutExchange;
    }

    @Bean
    public Binding binding1() {
        Binding binding = BindingBuilder.bind(myQueue1()).to(fanoutExchange());
        return binding;
    }

    @Bean
    public Binding binding2() {
        Binding binding = BindingBuilder.bind(myQueue2()).to(fanoutExchange());
        return binding;
    }

}

5.配置direct直连模式DirectExchangeConfig.java

package com.xuan.springrabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//direct直连模式的交换机配置,包括一个direct交换机,两个队列,三根网线binding
@Configuration
public class DirectExchangeConfig {

    @Bean
    public DirectExchange directExchange() {
        DirectExchange directExchange = new DirectExchange("direct");
        return directExchange;
    }

    @Bean
    public Queue directQueue1() {
        Queue queue = new Queue("directqueue1");
        return queue;
    }

    @Bean
    public Queue directQueue2() {
        Queue queue = new Queue("directqueue2");
        return queue;
    }

    //3个binding将交换机和相应队列连起来
    @Bean
    public Binding bindingorange() {
        Binding binding = BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");
        return binding;
    }

    @Bean
    public Binding bindingblack() {
        Binding binding = BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");
        return binding;
    }

    @Bean
    public Binding bindinggreen() {
        Binding binding = BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");
        return binding;
    }


}

6.配置topic交换机模型TopicExchangeConfig.java

package com.xuan.springrabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//topic交换机模型,需要一个topic交换机,两个队列和三个binding
@Configuration
public class TopicExchangeConfig {
    @Bean
     public TopicExchange topicExchange(){
        TopicExchange topicExchange=new TopicExchange("mytopic");
         return topicExchange;
     }
    
    @Bean
    public Queue topicQueue1() {
       Queue queue=new Queue("topicqueue1");
       return queue;
    }
     
     @Bean
    public Queue topicQueue2() {
       Queue queue=new Queue("topicqueue2");
       return queue;
    }
    
     //3个binding将交换机和相应队列连起来
     @Bean
     public Binding bindingtopic1(){
         Binding binding= BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");//binding key
         return binding;
     }
     
     @Bean
     public Binding bindingtopic2(){
         Binding binding= BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
         return binding;
     }
     
     @Bean
     public Binding bindingtopic3(){
         Binding binding= BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");//#表示0个或若干个关键字,*表示一个关键字
         return binding;
     }
}

7.定义发送的消息Mail.java

package po;

import java.io.Serializable;

public class Mail implements Serializable {

    private static final long serialVersionUID = -8140693840257585779L;
    private String mailId;
    private String country;
    private Double weight;


    public Mail() {
    }

    public Mail(String mailId, String country, double weight) {
        this.mailId = mailId;
        this.country = country;
        this.weight = weight;
    }

    public String getMailId() {
        return mailId;
    }

    public void setMailId(String mailId) {
        this.mailId = mailId;
    }

    public String getCountry() {
        return country;
    }

    public void setCountry(String country) {
        this.country = country;
    }

    public double getWeight() {
        return weight;
    }

    public void setWeight(double weight) {
        this.weight = weight;
    }

    @Override
    public String toString() {
        return "Mail [mailId=" + mailId + ", country=" + country + ", weight="
                + weight + "]";
    }

}

8.继承的消息TopicMail.java

package po;

public class TopicMail extends Mail {
    String routingkey;

    public String getRoutingkey() {
        return routingkey;
    }

    public void setRoutingkey(String routingkey) {
        this.routingkey = routingkey;
    }

    @Override
    public String toString() {
        return "TopicMail [routingkey=" + routingkey + "]";
    }

}

9.定义发送接口的实现ProducerImpl.java

package com.xuan.springrabbitmq.service.impl;

import com.xuan.springrabbitmq.service.Producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import po.Mail;

@Transactional
@Service("producer")
public class ProducerImpl implements Producer {
    @Autowired
    RabbitTemplate rabbitTemplate;
    public void sendMail(String queue, Mail mail) {
        rabbitTemplate.setQueue(queue);
        rabbitTemplate.convertAndSend(queue,mail);
    }

}

10.订阅发布时的发送消息实现PublisherImpl.java

package com.xuan.springrabbitmq.service.impl;

import po.Mail;
import com.xuan.springrabbitmq.service.Publisher;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("publisher")
public class PublisherImpl implements Publisher {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void publishMail(Mail mail) {
        rabbitTemplate.convertAndSend("fanout", "", mail);
    }

    public void senddirectMail(Mail mail, String routingkey) {
        rabbitTemplate.convertAndSend("direct", routingkey, mail);
    }

    public void sendtopicMail(Mail mail, String routingkey) {
        rabbitTemplate.convertAndSend("mytopic", routingkey, mail);
    }
    
    
}

11.消费者的实现代码QueueListener1.java

package com.xuan.springrabbitmq.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import po.Mail;

@Component
@RabbitListener(queues = "myqueue")
public class QueueListener1 {

    @RabbitHandler
    public void displayMail(Mail mail, Channel channel, Message message) throws Exception {
        System.out.println("队列监听器1号收到消息" + mail.toString());
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//如果需要确认的要调用
    }
}

12.或者QueueListener2.java

package com.xuan.springrabbitmq.listener;


import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import po.Mail;

@Component
public class QueueListener2 {
    
    @RabbitListener(queues = "myqueue")
    public void displayMail(Mail mail) throws Exception {
        System.out.println("队列监听器2号收到消息"+mail.toString());
    }
}

 

上一篇:php的rabbitmq扩展(未测试)


下一篇:spring-web 集成 rabbitmq