RabbitMQ
RabbitMQ的模式
简单模式
代码实现
Product
package com.rabbit.test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Product {
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
System.out.println(factory.getPort());
factory.setHost("192.168.31.124");
//创建连接对象Connection
Connection connection=factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
/**
* queue 队列名
* durable 是否持久化队列, rabbitmq服务重启后该队列是否存在
* exclusive 信道是否独占该队列
* autoDelete 是否自动删除 如果长时间没有发送消息 则自动删除
* arg 额外参数 先给null
*/
channel.queueDeclare("qy129", true, false, false, null);
String msg = "狗伪凯";
/**
* exchange 是交换机名称,没有则为""
* routingKey 路由key 如果没有交换机绑定就填队列名
* props 消息的一些额外配置
* body 消息的内容
*/
channel.basicPublish("", "qy129", null, msg.getBytes());
}
}
Comsumers
package com.rabbit;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumers {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("内容:"+new String(body));
}
};
/**
* (String queue, 队列的名称
* boolean autoAck, 是否自动确认
* Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
*/
channel.basicConsume("qy129", true, callback);
}
}
工作者模式
用处
比如批量处理上. rabbitMQ里面积压了大量的消息。
代码实现
Product
package com.rabbit.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Product {
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
System.out.println(factory.getPort());
factory.setHost("192.168.31.124");
//创建连接对象Connection
Connection connection=factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
for (int i = 0; i < 10; i++) {
String msg = "狗伪凯"+i;
channel.basicPublish("", "work", null, msg.getBytes());
}
}
}
Consumer01
package com.rabbit.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("内容1:"+new String(body));
}
};
channel.basicConsume("work", true, callback);
}
}
Consumer02
package com.rabbit.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("内容2:"+new String(body));
}
};
channel.basicConsume("work", true, callback);
}
}
发布订阅模式
代码实现
Product
package com.rabbit.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Product {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("fanout01", true, false, false, null);
channel.queueDeclare("fanout02", true, false, false, null);
channel.exchangeDeclare("exc", BuiltinExchangeType.FANOUT, true);
channel.queueBind("fanout01", "exc", "");
channel.queueBind("fanout02", "exc", "");
for (int i = 0; i < 20; i++) {
String s = "狗王伪凯"+i;
channel.basicPublish("exc","",null,s.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Consumer01
public class Consumer01 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("内容1:"+new String(body));
}
};
channel.basicConsume("fanout01", true, callback);
} catch (IOException e) {
e.printStackTrace();
}catch (TimeoutException e) {
e.printStackTrace();
}
}
}
Consumer02
public class Consumer02 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("内容2:"+new String(body));
}
};
channel.basicConsume("fanout02", true, callback);
} catch (IOException e) {
e.printStackTrace();
}catch (TimeoutException e) {
e.printStackTrace();
}
}
}
路由模式
代码实现
Product
package com.rabbit.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Product {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("direct01", true, false, false, null);
channel.queueDeclare("direct02", true, false, false, null);
channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT, true);
channel.queueBind("direct01", "direct", "error");
channel.queueBind("direct02", "direct", "error");
channel.queueBind("direct02", "direct", "info");
channel.queueBind("direct02", "direct", "debug");
for (int i = 0; i < 5; i++) {
String s = "狗王伪凯"+i;
channel.basicPublish("direct","error",null,s.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Consumer01
package com.rabbit.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("direct01:"+new String(body));
}
};
channel.basicConsume("direct01", true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
Comsumer02
package com.rabbit.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("direct02:"+new String(body));
}
};
channel.basicConsume("direct02", true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
topic主体模式
*: 统配一个单词。
#: 统配n个单词
代码实现
Product
package com.rabbit.topics;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Product {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("topic01", true, false, false, null);
channel.queueDeclare("topic02", true, false, false, null);
channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC, true);
channel.queueBind("topic01", "topic", "error*");
channel.queueBind("topic02", "topic", "#.error*");
channel.queueBind("topic02", "topic", "#.info.#");
channel.queueBind("topic02", "topic", "debug.#");
for (int i = 0; i < 5; i++) {
String s = "狗王伪凯"+i;
channel.basicPublish("topic","error.ssssssssssssssssssss",null,s.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Consumer01
package com.rabbit.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("topic01:"+new String(body));
}
};
channel.basicConsume("topic01", true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
Consumer02
package com.rabbit.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.124");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("topic02:"+new String(body));
}
};
channel.basicConsume("topic02", true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
rabbitMQ整合springboot
springboot引入了相关的依赖后,提供一个工具类RabbitTemplate.使用这个工具类可以发送消息。
项目结构
建立父工程引入相关的依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>product</module>
<module>consumer</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ykq</groupId>
<artifactId>springboot-rabbit-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbit-parent</name>
<description>springboot整合rabbitMQ</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--rabbitMQ的依赖: 启动类加载。读取配置文件:
springboot自动装配原理: 引用starter启动依赖时,把对应的自动装配类加载进去,该自动装配类可以读取application配置文件中
内容。 DispatherServlet
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Producter配置文件
server:
port: 8080
spring:
rabbitmq:
host: 192.168.31.124
datasource:
druid:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql:///test
username: root
password: root
Consumer配置文件
server:
port: 8081
spring:
rabbitmq:
host: 192.168.31.124
datasource:
druid:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql:///test
username: root
password: root
Product
package com.springboot.rabbit.controller;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
@RestController
public class HelloController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/hello/{id}/{number}")
public String hello(@PathVariable("id") Integer id,@PathVariable("number") Integer number) {
HashMap<String, Integer> map = new HashMap<>();
map.put("id", id);
map.put("number", number);
String s = JSON.toJSONString(map);
//发送消息
rabbitTemplate.convertAndSend("exc","",s);
return "购买成功";
}
}
Consumer01
package com.springboot.rabbit.consumer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.springboot.rabbit.dao.GoodsMapper;
import com.springboot.rabbit.entity.Goods;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class Consumer01 {
@Autowired
private GoodsMapper goodsMapper;
@RabbitListener(queues = {"fanout01"})
public void consumer(String msg) {
Map<String,Integer> map = JSON.parseObject(msg, Map.class);
Goods goods = goodsMapper.selectById(map.get("id"));
if (goods.getCount() > map.get("number")) {
goods.setCount(goods.getCount() - map.get("number"));
goodsMapper.updateById(goods);
System.out.println(this.getClass().getName()+"出库成功");
}else{
System.out.println(this.getClass().getName()+"库存不足");
}
}
}
Consumer02
package com.springboot.rabbit.consumer;
import com.alibaba.fastjson.JSON;
import com.springboot.rabbit.dao.GoodsMapper;
import com.springboot.rabbit.entity.Goods;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class Consumer02 {
@Autowired
private GoodsMapper goodsMapper;
@RabbitListener(queues = {"fanout02"})
public void consumer(String msg) {
Map<String,Integer> map = JSON.parseObject(msg, Map.class);
Goods goods = goodsMapper.selectById(map.get("id"));
if (goods.getCount() > map.get("number")) {
System.out.println("需要支付:"+goods.getPrice() * map.get("number"));
}else{
System.out.println(this.getClass().getName()+"库存不足");
}
}
}