RabbitMQ基础使用

1.MQ基础介绍

同步调用

OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才

能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用

异步调用

异步调用通常是基于消息通知的方式,包含三个角色:

        消息发送者:投递消息的人,就是原来的调用者

        消息接收者:接收和处理消息的人,就是原来的服务提供者

        消息代理:管理、暂存、转发消息,就是原来的服务提供方

不同的MQ介绍

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

本文讲解RabbitMQ

2.RabbitMQ

1.安装部署

在docker安装rabbit,代码如下:

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network hm-net\
 -d \
 rabbitmq:3.8-management

安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。

2.收发消息

交换机

点开amq.fanout交换机,将交换机和队列做绑定

队列

指定队列的名字添加队列

3.数据隔离

问题:要实现不同虚拟主机之间不同的交换机之间的隔离,需要用到数据隔离的技术。

添加用户

添加自己的虚拟主机

3.SpringAMQP

RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。

而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且

还基于SpringBoot对其实现了自动装配,使用起来非常方便。

1.快速入门

导入依赖:

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置信息:

spring:
  rabbitmq:
    host: 192.168.145.129 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

编写消息发送者和接收者的代码

发送者:写了一个测试类,注入了RabbitTemplate对象调用convertAndSend方法把队列名和消息发送出去。

@SpringBootTest
public class amqp {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testsimplequeue(){
        //队列名
        String queue="simple.queue";
        //消息
        String message="hello";
        //发送消息
        rabbitTemplate.convertAndSend(queue,message);
    }
}

接收者:在方法上加上 @RabbitListener(queues = "simple.queue")即可实现对消息的接收。

@Component
public class listen {
    //队列的名称
    @RabbitListener(queues = "simple.queue")
    //发送者发送的什么类型,接收者用什么类型接收
    public void listen1(String message){
        System.out.println("接收到消息"+message);
    }
}

2.WorkQueues模型

Work queues,任务模型。简单来说就是多个消费者绑定到一个队列,共同消费队列中的消息

模拟此环境再多加一个消费者即可。

AMQP默认是采用类似轮询的机制,性能慢的机器会拖慢速度,因此需做如下配置实现能者多劳:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.交换机

1.Fanout交换机

fanout交换机是将消息全部发送到绑定的队列中。

使用fanout交换机发送消息要使用如下函数:

    @Test
    public void testfanout(){
        //交换机名称
        String exchange="hmall.fanout";
        //消息
        String message="hello,everyone";
        //fanout交换机发送消息         交换机    队列   消息
        rabbitTemplate.convertAndSend(exchange,null,message);
    }

2.Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

        队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

        消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

        Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有            队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

绑定路由key实例:

代码示例:

rabbitTemplate.convertAndSend(exchange,"white",message);指定路由key,发送到指定的队列。

    @Test
    public void testdirect(){
        //交换机名称
        String exchange="hmall.direct";
        //消息
        String message="hello,everyone,direct";
        //发送消息                             指定路由key
        rabbitTemplate.convertAndSend(exchange,"white",message);
    }

3.Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

绑定示例:

代码示例:

    @Test
    public void testdirect(){
        //交换机名称
        String exchange="hmall.topic";
        //消息
        String message="hello,everyone,topic";
        //发送消息 topic
        rabbitTemplate.convertAndSend(exchange,"#.news",message);
    }

4.基于bean声明队列交换机

在java代码中注册交换机和队列

fanout交换机代码如下:

@Configuration
public class fanout {
    //注册交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        //return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
        return new FanoutExchange("hmall.fanout");
    }
    //注册队列
    @Bean
    public Queue queue1(){
        //return QueueBuilder.durable("fanout.queue1").build();
        return new Queue("fanout.queue1");
    }
    //绑定
    @Bean
    public Binding fanoutbindingqueue1(Queue queue1,FanoutExchange fanout){
        return BindingBuilder.bind(queue1).to(fanout);
    }
    @Bean
    public Queue queue2(){
        return new Queue("fanout.queue2");
    }
    @Bean
    public Binding fanoutbindingqueue2(Queue queue2,FanoutExchange fanout){
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

direct交换机代码如下:

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

5.基于注解声明队列交换机

基于bean声明太过复杂,尤其是direct交换机绑定key时,因此要用到注解声明。

在注解下同时创建队列交换机并完成绑定,代码如下:

    @RabbitListener(bindings = @QueueBinding(
            //注册队列                  名称              持久化
            value = @Queue(name = "direct.queue",durable = "ture"),
            //注册交换机                 名称              类型为direct
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
            //交换机绑定队列属性有red和white
            key = {"red","white"}
    ))
    public void listen(String message){
        System.out.println("接收到消息"+message);
    }

4.消息转化器

spring在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:(比如用rabbitmq发送map集合,数据被序列化后字节太多而且可读性差)

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

因此我们使用JSON方式来做序列化和反序列化。

导入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

RabbitMQ高级

未完待续

上一篇:ARM/Linux嵌入式面经(三九):中科驭数


下一篇:Web3Auth 如何工作?