前言:这是中间件一个系列的文章之一,有需要的朋友可以看看这个系列的其他文章:
消息中间件系列一、消息中间件的基本了解
消息中间件系列二、Windows下的activeMQ和rabbitMQ的安装
消息中间件系列三、JMS和activeMQ的简单使用
消息中间件系列四、认识AMQP和RabbiyMq的简单使用
消息中间件系列五、rabbit消息的确认机制
消息中间件系列六,rabbit与spring集成实战
AMQP
AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制 。
一、AMQP的一些技术术语
- AMQP模型(AMQP Model):一个由关键实体和语义表示的逻辑框架,遵从AMQP规范的服务器必须提供这些 实体和语义。为了实现本规范中定义的语义,客户端可以发送命令来控制AMQP服务器。
- 连接(Connection):一个网络连接,比如TCP/IP套接字连接。
- 会话(Session):端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
- 信道(Channel):多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。虚拟的连接,建立在真实的tcp连接之上的。信道的创建没有限制的。
- 交换器(Exchange):服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
- 消息队列(Message Queue):一个命名实体,用来保存消息直到发送给消费者。
- 绑定器(Binding):消息队列和交换器之间的关联。
- 绑定器关键字(Binding Key):绑定的名称。一些交换器类型可能使用这个名称作为定义绑定器路由行为的模式。
- 路由关键字(路由键)(Routing Key):一个消息头,交换器可以用这个消息头决定如何路由某条消息。
- 持久存储(Durable):一种服务器资源,当服务器重启时,保存的消息数据不会丢失。
- 临时存储(Transient):一种服务器资源,当服务器重启时,保存的消息数据会丢失。
- 持久化(Persistent):服务器将消息保存在可靠磁盘存储中,当服务器重启时,消息不会丢失。
- 非持久化(Non-Persistent):服务器将消息保存在内存中,当服务器重启时,消息可能丢失。
- 消费者(Consumer):一个从消息队列中请求消息的客户端应用程序。
- 生产者(Producer):一个向交换器发布消息的客户端应用程序。
消息处理过程:
队列通过路由键绑定到交换器,生产者把消息发送到了交换器,交换器根据绑定的路由键将消息路由到特定的队列,订阅了队列的消费者进行接收。
说明(下文会证明这三点)
- 如果消息达到无人订阅的队列会一直在队列中等待,rabbitmq会默认队列是无限长度的。
- 如果多个消费者订阅到同一队列,消息会轮询的方式发送给消费者,每个消息只会发送给一个消费者
- 消息路由到了不存在的队列,消息会忽略,当消息不存在,消息丢失了。
二、创建自定义队列
生产者和消费者都可以调用declareQueue方法创建队列,当没有目标队列时才会创建,如果已经存在相同的队列了就不在重复创建。
相关参数:exclusive 队列为应用程序私有,auto-delete 最后一个消费者取消订阅时,队列会自动删除,durable 队列持久化。
如果消费者订阅了队列,就不能再声明队列了。
先贴上后面会说到的一段代码,演示是怎么创建队列的
Channel channel = connection.createChannel();//信道
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
String queueName = "direct_queue";
//创建队列,参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数
channel.queueDeclare(queueName,false,false,false,null);
检测队列是否存在
可以通过调用方法QueueDeclarePassive判断队列是否存在,看方法名是“消极的声明创建”的意思,事实上它没有去声明队列,所谓消极,去看看有没有名为xxx的queue,如果有我就把名字什么的信息告诉你,没有就直接报错,用来确认queue是否存在。
三、四种交换器:
1、direct:
路由键完全匹配时,消息投放到对应队列。Amqp实现都必须有一个direct交换器(默认交换器),名称为空白字符。队列不声明交换器,会自动绑定到默认交换器,队列的名称作为路由键。
补充:一个direct队列可以绑定多个路由键,一条消息可以发给多个direct队列(都有绑定相同的路由键),一个direct队列(有多个路由键)也可以接收不同类型的消息。
2、Fanout:
可以理解为广播,绑定这中交换器的队列,可以接收该交换器上任何类型的消息。
3、Topic:
主题,使来自不同源头的消息到达同一个队列
路由键中的“*”和“#”
“.”会把路由键分为好几个标识符,“*”匹配一个标识符,“#”匹配一个或者多个;
例如:xxx.yyy.zzzz 可以: xxx.*. zzzz , xxx.# , #.zzzz
4、Headers:
匹配消息头,其余与direct一样,实用性不大
举例说明
日志处理场景:
1、有交换器(topic)log_exchange,日志级别有 error,info,warning,应用模块有 user,order,email,路由键的规则是 日志级别+“.”+应用模块名(例如info.user)
2、发送邮件失败,报告一个email的error,basicPublic(message,"log-exchange","error.email")
队列的绑定:queueBind("email-error-queue","log-exchange","error.email")
要监听email所有的日志怎么办?
queueBind("email-log-queue","log-exchange"," *.email")
监听所有模块所有级别日志?
queuebind(“all-log-queue”,"log-exchange","#")
“.”会把路由键分为好几个标识符,“*”匹配一个标识符,“#”匹配一个或者多个(xxx.yyy.zzzz 可以: xxx.*. zzzz , xxx.# , #.zzzz)。
补充:生产者要把消息发送到消费者,两者必须绑定同一个的交换器。
四、虚拟主机
Vhost,真实rabbitmq服务器上的mini型虚拟的mq服务器。有自己的权限机制。Vhost提供了一个逻辑上的分离,可以区分客户端,避免队列和交换器的名称冲突。RabbitMq包含了一个缺省的vhost :“/”,用户名guest,口令 guest(guest用户只能在本机访问)。
五、消息持久化
1、队列是必须持久化
2、交换器也必须是持久化
3、消息的投递模式必须(int型) 2
以上条件全部满足,消息才能持久化
问题:持久化会带来性能的严重下降(下降10倍)
六、Rabbit简要架构介绍
消息队列的使用过程大概如下:
(1)客户端连接到消息队列服务器,打开一个channel(信道)。(生产者和消费者都可操作)
(2)客户端声明一个exchange(交换器),并设置相关属性。(生产者和消费者都可操作)
(3)客户端声明一个queue(队列),并设置相关属性。(一般在消费者中操作)
(4)客户端使用routing key(路由键),在exchange(交换器)和queue(队列)之间建立好绑定关系。(一般在消费者中操作)
(5)客户端投递消息到exchange(交换器)。(一般在生产者中进行)
客户端先给指定交换器(exchange)发送消息,交换器再根据路由键把消息发送给相应的队列,而订阅了相应队列的客户端就能收到消息。
七、使用RabbitMq原生Java客户端进行消息通信
1、添加依赖
客户端Jar包和源码包下载地址:
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0.jar
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0-sources.jar
如果是引入jar包的形式还需要引入slf4j-api-1.6.1.jar。
如果是Maven工程加入:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency>
注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的*仓库查)的版本。
2、生产者通过DIRECT类型的交换器发布消息
AMQP一样要连接工厂、连接,与JMS中间件不同的是,AMQP多了信道、交换器、路由键等这些概念。处理过程看下面代码:
package dongnaoedu.normal;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectProducer {
private final static String EXCHANGE_NAME = "direct_logs";//交换器
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
//gust用户只能在本机访问
//非本机访问需要设置以下属性
/* factory.setUsername(..);
factory.setPort();
factory.setVirtualHost();*/
Connection connection = factory.newConnection();//连接
Channel channel = connection.createChannel();//信道
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//定义连接交换器
String[]serverities = {"error","info","warning"};//路由键
for(int i=0;i<3;i++){
String server = serverities[i];
String message = "Hello world_"+(i+1);
//basicProperties是设置一些消息属性的,不需要可以传null
//通过信道把消息、路由键传给交换器,交换器会根据路由键把消息传给相应的队列
channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes());
System.out.println("Sent "+server+":"+message);
}
channel.close();
connection.close();
}
}
3、生产者通过FANOUT类型的交换器发布消息
FANOUT和DIRECT在原生代码的实现方式上基本一样,只有在调用channel的exchangeDeclare方法声明交换器时把交换器类型改成FANOUT即可。
package dongnaoedu.normal;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class FanoutProducer {
private final static String EXCHANGE_NAME = "fanout_logs";//交换器
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
/* factory.setUsername(..);
factory.setPort();
factory.setVirtualHost();*/
Connection connection = factory.newConnection();//连接
Channel channel = connection.createChannel();//信道
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//交换器
String[]serverities = {"error","info","warning"};//路由键
for(int i=0;i<3;i++){
String server = serverities[i];
String message = "Hello world_"+(i+1);
channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes());
System.out.println("Sent "+server+":"+message);
}
channel.close();
connection.close();
}
}
4、指定路由键的消费者
package dongnaoedu.normal;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerError {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();//连接
Channel channel = connection.createChannel();//信道
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
//声明随机队列
String queueName = channel.queueDeclare().getQueue();
String server = "error";
//队列和交换器的绑定
channel.queueBind(queueName,EXCHANGE_NAME,server);//把队列按路由键绑定到交换器上
System.out.println("Waiting message.......");
Consumer consumerB = new DefaultConsumer(channel){
//消息的回调监听
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
}
};
//三个参数:队列名,是否自动确认,消息监听回调
channel.basicConsume(queueName,true,consumerB);//对消息进行消费
}
}
补充:这里涉及到rabbit的确认机制,相关内容我会在下一篇博客消息中间件系列五、rabbit消息的确认机制再做详细介绍。
5、绑定多个路由键的消费者
package dongnaoedu.normal;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerAll {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();//连接
Channel channel = connection.createChannel();//信道
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
//声明随机队列
String queueName = channel.queueDeclare().getQueue();
String[]serverities = {"error","info","warning"};
for(String server:serverities){
//队列和交换器的绑定
channel.queueBind(queueName,EXCHANGE_NAME,server);//server是路由建,一个队列可以绑定多个路由建
}
System.out.println("Waiting message.......");
Consumer consumerA = new DefaultConsumer(channel){
//消息的回调监听
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);//getRoutingKey拿到路由建
}
};
//三个参数:队列名,是否自动确认,消息监听回调
channel.basicConsume(queueName,true,consumerA);//对消息进行消费
}
}
启动消费者之后,两个消费者的打印信息都是一样的,会一直监听消息,当生产者发了消息的时候消费者会调用handleDelivery监听方法。
此时有两个消费者,连接,通道,交换器队列的信息如下
可以看到产生了两个队列,所以这两个消费者接收消息是互不影响的。
再启动生产者DirectProducer
这个简单的生产者执行完了就退出了,
ConsumerError通过路由键的匹配只能收到error类的消息
ConsumerAll收到了全部的消息:
6、自己创建队列,不同消费者订阅相同的队列
在上面例子的基础上进行改动,
调用通道的queueDeclare方法即可创建指定名字的队列,相关参数在目录二解释过了
package dongnaoedu.normal;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerAll {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();//连接
Channel channel = connection.createChannel();//信道
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
//声明随机队列
// String queueName = channel.queueDeclare().getQueue();
String queueName = "direct_queue";
//创建队列,参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数
channel.queueDeclare(queueName,false,false,false,null);
String[]serverities = {"error","info","warning"};
for(String server:serverities){
//队列和交换器的绑定
channel.queueBind(queueName,EXCHANGE_NAME,server);//server是路由建,一个队列可以绑定多个路由建
}
System.out.println("Waiting message.......");
Consumer consumerA = new DefaultConsumer(channel){
//消息的回调监听
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);//getRoutingKey拿到路由建
}
};
//三个参数:队列,自动确认,消息监听回调
channel.basicConsume(queueName,true,consumerA);//对消息进行消费
}
}
package dongnaoedu.normal;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerError {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();//连接
Channel channel = connection.createChannel();//信道
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
// channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//交换器
//声明随机队列
// String queueName = channel.queueDeclare().getQueue();
String queueName = "direct_queue";
// direct_queue这个队列已经在ConsumerAll 绑定了三个路由键,这里不用再绑定了,也不用重复创建了
//参数分别为:队列名,durable是否进行持久化,exlusive是否私有,auto-delete没有消费者是自动删除,arguments相关参数
// channel.queueDeclare(queueName,false,false,false,null);
// String server = "error";
// channel.queueBind(queueName,EXCHANGE_NAME,server);//把队列按路由键绑定到交换器上
System.out.println("Waiting message.......");
Consumer consumerB = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
}
};
channel.basicConsume(queueName,true,consumerB);
}
}
两个类运行后的队列:
可以看到只有一个队列,两个消费者都是订阅这个队列。
生产者DirectProducer不用改变,运行DirectProducer后消费者接收信息的打印结果:
可以看到消息是轮询的方式发送给消费者,这时再把消费者关掉,可以看到队列已经没有消息了,由于创建队列的时候auto-delete设置为false,这时队列并没有删除。
再一次运行生产者,可以看到队列收到了3条新消息,没有被消费者消费。
在生产者这边,添加一个没有队列绑定的路由键,发现在direct交换器下队列和消费者和都没有受到任何影响。
String[]serverities = {"error","info","warning","test"};//路由键
for(int i=0;i<4;i++){
String server = serverities[i];
String message = "Hello world_"+(i+1);
//basicProperties是设置一些消息属性的,不需要可以传null
//通过信道把消息、路由键传给交换器,交换器会根据路由键把消息传给相应的队列
channel.basicPublish(EXCHANGE_NAME,server,null,message.getBytes());
System.out.println("Sent "+server+":"+message);
}
这证实了上文说到的:
- 如果多个消费者订阅到同一队列,消息会轮询的方式发送给消费者,每个消息只会发送给一个消费者。
- 如果消息达到无人订阅的队列会一直在队列中等待,rabbitmq会默认队列是无限长度的。
- 消息路由到了不存在的队列,消息会忽略,当消息不存在,消息丢失了。