Rabbitmq基本概念
RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
搭建Rabbitmq服务器:
rabbitmq安装
1.使用docker环境,下载rabbitmq:management镜像
有压缩包的直接使用即可
docker pull rabbitmq:management
将压缩包放入root目录下并进行导入镜像:
docker load -i rabbit-image.gz #导入rabbit镜像
docker images #查看
2.关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
# 重启 docker 系统服务
systemctl restart docker
3.配置管理员用户名和密码
mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf
# 添加两行配置:
default_user = admin
default_pass = admin
4.启动Rabbitmq
docker run -d --name rabbit \
-p 5672:5672 \
-p 15672:15672 \
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
--restart=always \
rabbitmq:management
访问管理控制台 http://192.168.64.140:15672
用户名密码是 admin
Rabbitmq 六中工作模式 在idea中应用
添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
简单模式:只有一个消费者
生产者发送消息:
package rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Test1 {
public static void main(String[] args) throws Exception {
//创建连接工厂,并设置连接信息
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.141");
f.setPort(5672);//可选,5672是默认端口
f.setUsername("admin");
f.setPassword("admin");
/*
* 与rabbitmq服务器建立连接,
* rabbitmq服务器端使用的是nio,会复用tcp连接,
* 并开辟多个信道与客户端通信
* 以减轻服务器端建立连接的开销
*/
Connection c = f.newConnection();
//建立信道
Channel ch = c.createChannel();
/*
* 声明队列,会在rabbitmq中创建一个队列
* 如果已经创建过该队列,就不能再使用其他参数来创建
*
* 参数含义:
* -queue: 队列名称
* -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
* -exclusive: 排他,true表示限制仅当前连接可用
* -autoDelete: 当最后一个消费者断开后,是否删除队列
* -arguments: 其他参数
*/
ch.queueDeclare("helloworld", false,false,false,null);
/*
* 发布消息
* 这里把消息向默认交换机发送.
* 默认交换机隐含与所有队列绑定,routing key即为队列名称
*
* 参数含义:
* -exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null
* -routingKey: 对于默认交换机,路由键就是目标队列名称
* -props: 其他参数,例如头信息
* -body: 消息内容byte[]数组
*/
ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
System.out.println("消息已发送");
c.close();
}
}
消费者接收消息:
package rabbitmq.simple;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Test2 {
public static void main(String[] args) throws Exception {
//连接工厂
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.141");
f.setUsername("admin");
f.setPassword("admin");
//建立连接
Connection c = f.newConnection();
//建立信道
Channel ch = c.createChannel();
//声明队列,如果该队列已经创建过,则不会重复创建
ch.queueDeclare("helloworld",false,false,false,null);
System.out.println("等待接收数据");
//收到消息后用来处理消息的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: "+msg);
}
};
//消费者取消时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
ch.basicConsume("helloworld", true, callback, cancel);
}
}
FANOUT群发模式
生产者发送消息时会发送给每一个接收者
生产者:
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//当你生产者有一个队列时,消费者也必须一个队列进行接收。
//通过交换机进行发送消息,可以定义消息是否持久。交换机不存储消息需要队列接收消息
ConnectionFactory f = new ConnectionFactory(); //通过连接工厂进行连接
f.setHost("192.168.64.141"); //进行连接对应服务的ip
f.setPort(5672); //访问的消息服务的端口号
f.setUsername("admin"); //账号密码
f.setPassword("admin");
Connection connection = f.newConnection();
Channel c = connection.createChannel(); //通信通道
//创建Fanout交换机: logs为消息队列的名字 发布和订阅模式 群发机制
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
while (true){
System.out.println("输入传递的信息:");
String s = new Scanner(System.in).nextLine();
c.basicPublish("logs","",null,s.getBytes());
}
}
}
消费者:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory f = new ConnectionFactory(); //通过连接工厂进行连接
f.setHost("192.168.64.141"); //进行连接对应服务的ip
f.setPort(5672); //访问的消息服务的端口号
f.setUsername("admin"); //账号密码
f.setPassword("admin");
Connection connection = f.newConnection();
Channel c = connection.createChannel(); //通信通道
//1.创建队列 2.创建交换机 3.进行消息队列的绑定
String queue = UUID.randomUUID().toString();//创建随机队列 保存得到的消息
//队列名 非持久 独占 自动删除
c.queueDeclare(queue,false,true,true,null);
//创建交换机 生产者是什么消息模式就创造什么消息模式 fanout交换机
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
//进行绑定 对fanout交换机来说 第三个参数是无效的
c.queueBind(queue,"logs","");
//正常接收消息 创建回调对象
DeliverCallback deliverCallback =(consumerTag,message) -> {
//此处处理消息
String s = new String(message.getBody());
System.out.println("收到:"+s);
};
CancelCallback cancelCallback =consumerTag -> {
};
//开始接受消息,把消息传递给一个回调对象进行处理
c.basicConsume(queue,true,deliverCallback,cancelCallback);
}
}
路由模式 Direct
路由模式,设定关键词,会根据生产者发送的关键词进行接收消息。如果关键词不匹配是不会接收消息的。
生产者:
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//当你生产者有一个队列时,消费者也必须一个队列进行接收。
//通过交换机进行发送消息,可以定义消息是否持久。交换机不存储消息需要队列接收消息
ConnectionFactory f = new ConnectionFactory(); //通过连接工厂进行连接
f.setHost("192.168.64.141"); //进行连接对应服务的ip
f.setPort(5672); //访问的消息服务的端口号
f.setUsername("admin"); //账号密码
f.setPassword("admin");
Connection connection = f.newConnection();
Channel c = connection.createChannel(); //通信通道
//创建路由模式交换机 : Direct
c.exchangeDeclare("direct_logs",BuiltinExchangeType.DIRECT);
//向交换机发送消息,并携带路由关键词
while (true){
System.out.println("输入消息:");
String s = new Scanner(System.in).nextLine();
System.out.println("输入路由键:");
String k = new Scanner(System.in).nextLine();
//对默认交换机“”,会自动使用队列名作为路由键
c.basicPublish("direct_logs",k,null,s.getBytes());
}
}
}
消费者:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory f = new ConnectionFactory(); //通过连接工厂进行连接
f.setHost("192.168.64.141"); //进行连接对应服务的ip
f.setPort(5672); //访问的消息服务的端口号
f.setUsername("admin"); //账号密码
f.setPassword("admin");
Connection connection = f.newConnection();
Channel c = connection.createChannel(); //通信通道
//1.创建队列 2.创建交换机 3.进行消息队列的绑定
String queue = UUID.randomUUID().toString();//创建随机队列 保存得到的消息
//队列名 非持久 独占 自动删除
c.queueDeclare(queue,false,true,true,null);
c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);//创建路由交换机
System.out.println("输入绑定的关键词,用空格隔开");
String s = new Scanner(System.in).nextLine();
String[] a = s.split("\\s+"); // \s是空白格 + 指一到多个
for (String k : a){
c.queueBind(queue,"direct_logs",k); //进行遍历绑定
}
//从队列接收消息
DeliverCallback deliverCallback =(consumerTag, message) -> {
//此处处理消息
String msg = new String(message.getBody());
String key = message.getEnvelope().getRoutingKey();//得到路由键
System.out.println(key+"----"+msg);
};
CancelCallback cancelCallback =consumerTag -> {
};
//开始接受消息,把消息传递给一个回调对象进行处理
c.basicConsume(queue,true,deliverCallback,cancelCallback);
}
}
主题模式:
发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。
bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:
* 可以通配单个单词。
# 可以通配零个或多个单词
生产者:
public class Producer5 {
public static void main(String[] args) throws IOException, TimeoutException {
//生产者创建交换机 消费者创建队列
//连接
//连接
ConnectionFactory f = new ConnectionFactory();//连接工厂
f.setHost("192.168.64.129"); //进行连接对应服务的ip
f.setPort(5672); //访问的消息服务端口号
f.setUsername("admin"); //账号
f.setPassword("admin"); //密码
Connection con = f.newConnection();
Channel c = con.createChannel();//通信通道
//创建Topic交换机 :topic_logs 会自动使用队列作为关键词
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
//向交换机发送消息,并携带路由关键词
while (true) {
System.out.println("输入信息:");
String s = new Scanner(System.in).nextLine();
System.out.println("输入路由键:");
String k = new Scanner(System.in).nextLine();
//对默认交换机“”,会自动使用队列名作为路由键
c.basicPublish("topic_logs", k, null, s.getBytes());
}
}
}
消费者:
public class Consumer5 {
public static void main(String[] args) throws IOException, TimeoutException {
//连接
ConnectionFactory f = new ConnectionFactory();//连接工厂
f.setHost("192.168.64.141"); //进行连接对应服务的ip
f.setPort(5672); //访问的消息服务端口号
f.setUsername("admin"); //账号
f.setPassword("admin"); //密码
Connection con = f.newConnection();
Channel c = con.createChannel();//通信通道
//1.创建随机队列 2.创建交换机 3.使用绑定建关键词绑定
String queue = UUID.randomUUID().toString();
//非持久,独占,自动删除
c.queueDeclare(queue, false, true, true, null);
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);//创建交换机
System.out.println("输入绑定建关键词,用空格隔开:");
String s = new Scanner(System.in).nextLine();
String[] a = s.split("\\s+"); // \s是空白字符 + 指一到多个
for (String k : a) {
c.queueBind(queue, "topic_logs", k); //进行循环遍历绑定
}
//正常接收消息
//正常从队列接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
//此处处理消息
String msg = new String(message.getBody());
String key = message.getEnvelope().getRoutingKey();//得到路由键
System.out.println(key + "---" + msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
队列的持久化,消息持久化
将创建的队列第二个false改成true即可变成持久操作。
已经创建的队列参数是不能修改的,因为已经上传到服务器中了,可以创建一个新的队列再进行修改后面的参数。生产者与消费者必须一直。