Spring AMQP consists of a handful of modules, each represented by a JAR in the distribution. These modules are: spring-amqp, and spring-rabbit. The spring-amqp module contains the
org.springframework.amqp.core package.
Within that package, you will find the classes that represent the core AMQP "model".
RabbitMq系统架构,及工作流程
RabbitMq系统分为三个部分,producer, broker和consumer。 producer产生消息,将消息通过网络信道发送给部署在网络上的Amqp服务器broker的Message queue。consumer通过侦听消息队列来获取信息。
Amqp服务器Broker主要由Exchane和Message Queue组成,主要功能是Message的路由和缓存。
Exchange接受producer发送的message并根据不同的算法将Message路由给不同的queue。Message queue会在message不能被正常消费的时候缓存起来,当Consumer和Message Queue建立连接时,Message Queue会将消息传给consumer
Exchange和Message Queue之间通过Binding进行关联,Exchange和多个Queue会形成一张路由表,Exchange根据Routing Key和Exchange Type将Message路由到MessageQueue。
Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
Exchange Type分为Direct(单播)、Topic(组播)、Fanout(广播)。当为Direct(单播)时,Routing Key必须与Binding Key相等时才能匹配成功,当为Topic(组播)时,Routing Key与Binding Key符合一种模式关系即算匹配成功,当为Fanout(广播)时,不受限制。默认Exchange Type是Direct(单播)。
Springmq几个重要的类
Message
Spring AMQP定义的Message类是AMQP域模型中代表之一。Message类封装了body(消息BODY)和properties(消息属性),这使得API看起来很简单。 Message由Header和Body组成,Header是由Producer添加的各种属性的集合,包括Message是否被缓存、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties
messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
Exchange
Exchange代表一个AMQP交换,一个消息producer会将消息发送给Exchange。 每个Exchange都有一个唯一的名字和其他的属性。
public interface Exchange {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
ExchangeTypes: 基本类型包括 Direct(单播), Topic(组播), Fanout(广播), and Headers。默认为Direct Exchange
Queue
Message Consumer通过监听消息队列来获取数据
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
Binding
考虑到producer发送消息给Exchange一个consumer通过queue来接受消息,binding的作用是链接queue和exchange。
ConnectionFactory
管理到RabbitMQ broker的连接接口,具体的实现是CachingConnectionFactory类,CachingConnectionFactory为每个channel保持分离的cach。hostname可以通过构造器传入,同时我们需要提供username和pasword。ConnectionFactory可以使用rabbit namespace快速创建。 ConnectionFactory创建的channel 默认cach-size是1,我们可以使用rabbit namespace配置。
AmqpAdmin
需要为AmqpAdmin绑定指定的ConnectionFactory, 当配置了这个参数,本地的配置会在远端server生效。
RabbitTemplate
需要为RabbitTemplate绑定指定的ConnectionFactory,通过AmqpTemplate对象发送和接受消息。
Reference
http://blog.chinaunix.net/uid-22312037-id-3458208.html RabbitMQ源码解析前奏--AMQP协议
http://docs.spring.io/spring-amqp/reference/html/_reference.html#_introduction_3 Spring-amqp Reference