RabbitMQ的模式 整合SpringBoot

RabbitMQ

RabbitMQ的模式

简单模式

RabbitMQ的模式 整合SpringBoot

代码实现

Product

package com.rabbit.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Product {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory=new ConnectionFactory();
        System.out.println(factory.getPort());
        factory.setHost("192.168.31.124");
        //创建连接对象Connection
        Connection connection=factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        /**
         * queue 队列名
         * durable 是否持久化队列, rabbitmq服务重启后该队列是否存在
         * exclusive 信道是否独占该队列
         * autoDelete 是否自动删除 如果长时间没有发送消息  则自动删除
         * arg 额外参数  先给null
         */
        channel.queueDeclare("qy129", true, false, false, null);

        String msg = "狗伪凯";
        /**
         * exchange 是交换机名称,没有则为""
         * routingKey 路由key 如果没有交换机绑定就填队列名
         * props 消息的一些额外配置
         * body 消息的内容
         */
        channel.basicPublish("", "qy129", null, msg.getBytes());

    }
}

Comsumers

package com.rabbit;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumers {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();


        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("内容:"+new String(body));
            }
        };

        /**
         * (String queue, 队列的名称
         *  boolean autoAck, 是否自动确认
         *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
         */
        channel.basicConsume("qy129", true, callback);
    }

}

工作者模式

RabbitMQ的模式 整合SpringBoot

用处

比如批量处理上. rabbitMQ里面积压了大量的消息。

代码实现

Product

package com.rabbit.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Product {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory=new ConnectionFactory();
        System.out.println(factory.getPort());
        factory.setHost("192.168.31.124");
        //创建连接对象Connection
        Connection connection=factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        
        channel.queueDeclare("work", true, false, false, null);

        for (int i = 0; i < 10; i++) {
            String msg = "狗伪凯"+i;
            channel.basicPublish("", "work", null, msg.getBytes());
        }

    }
}

Consumer01

package com.rabbit.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("内容1:"+new String(body));
            }
        };

        channel.basicConsume("work", true, callback);
    }
}


Consumer02

package com.rabbit.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer02 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("内容2:"+new String(body));
            }
        };

        channel.basicConsume("work", true, callback);
    }
}


发布订阅模式

RabbitMQ的模式 整合SpringBoot

代码实现

Product

package com.rabbit.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Product {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("fanout01", true, false, false, null);
            channel.queueDeclare("fanout02", true, false, false, null);

            channel.exchangeDeclare("exc", BuiltinExchangeType.FANOUT, true);

            channel.queueBind("fanout01", "exc", "");
            channel.queueBind("fanout02", "exc", "");

            for (int i = 0; i < 20; i++) {
                String s = "狗王伪凯"+i;
                channel.basicPublish("exc","",null,s.getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}


Consumer01

public class Consumer01 {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        Connection connection = null;
        Channel channel = null;

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            DefaultConsumer callback = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("内容1:"+new String(body));
                }
            };
            channel.basicConsume("fanout01", true, callback);
        } catch (IOException e) {
            e.printStackTrace();
        }catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

Consumer02

public class Consumer02 {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        Connection connection = null;
        Channel channel = null;

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            DefaultConsumer callback = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("内容2:"+new String(body));
                }
            };
            channel.basicConsume("fanout02", true, callback);
        } catch (IOException e) {
            e.printStackTrace();
        }catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

路由模式

RabbitMQ的模式 整合SpringBoot

代码实现

Product

package com.rabbit.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Product {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("direct01", true, false, false, null);
            channel.queueDeclare("direct02", true, false, false, null);

            channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT, true);

            channel.queueBind("direct01", "direct", "error");

            channel.queueBind("direct02", "direct", "error");
            channel.queueBind("direct02", "direct", "info");
            channel.queueBind("direct02", "direct", "debug");


            for (int i = 0; i < 5; i++) {
                String s = "狗王伪凯"+i;
                channel.basicPublish("direct","error",null,s.getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Consumer01

package com.rabbit.direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        try {
            Connection connection = factory.newConnection();

            Channel channel = connection.createChannel();

            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("direct01:"+new String(body));
                }
            };
            channel.basicConsume("direct01", true, defaultConsumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

Comsumer02

package com.rabbit.direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer02 {

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        try {
            Connection connection = factory.newConnection();

            Channel channel = connection.createChannel();
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("direct02:"+new String(body));
                }
            };
            channel.basicConsume("direct02", true, defaultConsumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

topic主体模式

RabbitMQ的模式 整合SpringBoot

*: 统配一个单词。
#: 统配n个单词

代码实现

Product

package com.rabbit.topics;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Product {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("topic01", true, false, false, null);
            channel.queueDeclare("topic02", true, false, false, null);

            channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC, true);

            channel.queueBind("topic01", "topic", "error*");

            channel.queueBind("topic02", "topic", "#.error*");
            channel.queueBind("topic02", "topic", "#.info.#");
            channel.queueBind("topic02", "topic", "debug.#");


            for (int i = 0; i < 5; i++) {
                String s = "狗王伪凯"+i;
                channel.basicPublish("topic","error.ssssssssssssssssssss",null,s.getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Consumer01

package com.rabbit.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        try {
            Connection connection = factory.newConnection();

            Channel channel = connection.createChannel();

            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("topic01:"+new String(body));
                }
            };
            channel.basicConsume("topic01", true, defaultConsumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

Consumer02

package com.rabbit.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer02 {

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.124");
        try {
            Connection connection = factory.newConnection();

            Channel channel = connection.createChannel();
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("topic02:"+new String(body));
                }
            };
            channel.basicConsume("topic02", true, defaultConsumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

rabbitMQ整合springboot

springboot引入了相关的依赖后,提供一个工具类RabbitTemplate.使用这个工具类可以发送消息。
项目结构
RabbitMQ的模式 整合SpringBoot

建立父工程引入相关的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>
    <modules>
        <module>product</module>
        <module>consumer</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ykq</groupId>
    <artifactId>springboot-rabbit-parent</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbit-parent</name>
    <description>springboot整合rabbitMQ</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!--rabbitMQ的依赖: 启动类加载。读取配置文件:
           springboot自动装配原理: 引用starter启动依赖时,把对应的自动装配类加载进去,该自动装配类可以读取application配置文件中
           内容。 DispatherServlet
        -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Producter配置文件

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.31.124
  datasource:
    druid:
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql:///test
      username: root
      password: root

Consumer配置文件

server:
  port: 8081

spring:
  rabbitmq:
    host: 192.168.31.124
  datasource:
    druid:
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql:///test
      username: root
      password: root

Product

package com.springboot.rabbit.controller;

import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;

@RestController
public class HelloController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/hello/{id}/{number}")
    public String hello(@PathVariable("id") Integer id,@PathVariable("number") Integer number) {
        HashMap<String, Integer> map = new HashMap<>();
        map.put("id", id);
        map.put("number", number);
        String s = JSON.toJSONString(map);
        //发送消息
        rabbitTemplate.convertAndSend("exc","",s);
        return "购买成功";
    }
}

Consumer01

package com.springboot.rabbit.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.springboot.rabbit.dao.GoodsMapper;
import com.springboot.rabbit.entity.Goods;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class Consumer01 {

    @Autowired
    private GoodsMapper goodsMapper;

    @RabbitListener(queues = {"fanout01"})
    public void consumer(String msg) {
        Map<String,Integer> map = JSON.parseObject(msg, Map.class);

        Goods goods = goodsMapper.selectById(map.get("id"));
        if (goods.getCount() > map.get("number")) {
            goods.setCount(goods.getCount() - map.get("number"));
            goodsMapper.updateById(goods);
            System.out.println(this.getClass().getName()+"出库成功");
        }else{
            System.out.println(this.getClass().getName()+"库存不足");
        }
    }
}

Consumer02

package com.springboot.rabbit.consumer;

import com.alibaba.fastjson.JSON;
import com.springboot.rabbit.dao.GoodsMapper;
import com.springboot.rabbit.entity.Goods;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class Consumer02 {

    @Autowired
    private GoodsMapper goodsMapper;

    @RabbitListener(queues = {"fanout02"})
    public void consumer(String msg) {
        Map<String,Integer> map = JSON.parseObject(msg, Map.class);

        Goods goods = goodsMapper.selectById(map.get("id"));
        if (goods.getCount() > map.get("number")) {
            System.out.println("需要支付:"+goods.getPrice() * map.get("number"));
        }else{
            System.out.println(this.getClass().getName()+"库存不足");
        }
    }
}

上一篇:解决org.springframework.beans.factory.BeanCreationException Error creating bean with name‘xxx’


下一篇:.Net 知识点