1.简单模式:
P : 生产者
Queue(hello) : 队列
C : 消费者
步骤:
1.创建工程
2.分别添加RabbitMQ依赖
3.编写生产者发送消息
4.编写消费者获取消息
pom.xml
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
producer(生产者):
public class Producer_helloworld {
private static final String QUEUE_NAME="queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1获取连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2设置参数
factory.setHost("127.0.0.1");//主机号
factory.setPort(5672);//端口号
factory.setVirtualHost("/test");//队列虚拟机
factory.setUsername("test");//用户名
factory.setPassword("test");//密码
//3创建连接Connection
Connection connection = factory.newConnection();
//4创建Channel
Channel channel = connection.createChannel();
//5创建队列
/*
queueDeclare(
String queue, 队列名称
boolean durable, 是否持久化到本地
boolean exclusive, (1)是否独占连接,只能有一个消费者监听这个队列(2)当connection关闭时,是否删除队列
boolean autoDelete, 是否自动删除,没有消费之自动删除
Map<String, Object> arguments) 配置参数
* */
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
/*
basicPublish(
String exchange, 交换机名称,简单模式使用默认交换机,空字符串即为默认交换机
String routingKey, 路由名称
BasicProperties props, 配置信息
byte[] body 发送的消息数据
)
*/
String body="傻逼java面试官";
//6发送消息
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
//7释放资源
/* if(channel != null){
channel.close();
}
if(connection != null){
connection.close();
}*/
}
}
consumer(消费者):
public class Hello01 {
private static final String QUEUE_NAME="queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1获取连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2设置参数
factory.setHost("127.0.0.1");//主机号
factory.setPort(5672);//端口号
factory.setVirtualHost("/test");//队列虚拟机
factory.setUsername("test");//用户名
factory.setPassword("test");//密码
//3创建连接Connection
Connection connection = factory.newConnection();
//4创建Channel
Channel channel = connection.createChannel();
//5创建队列
/*
queueDeclare(
String queue, 队列名称
boolean durable, 是否持久化到本地
boolean exclusive, (1)是否独占连接,只能有一个消费者监听这个队列(2)当connection关闭时,是否删除队列
boolean autoDelete, 是否自动删除,没有消费之自动删除
Map<String, Object> arguments) 配置参数
* */
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
/*
basicConsume(
String queue, 队列名
boolean autoAck, 是否自动确认
Consumer callback 回调函数
)
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println(new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2.work queues工作队列模式:
p: 生产者
queue': 队列
C1: 消费者1
C2: 消费者2
work queues:与入门的简单模式相比,多了一个消费者,多个消费者共同消费同一个队列,消费者之间为竞争关系
应用场景:对于任务过重或任务较多的工作队列可以提高任务处理的速度.
producer:
public class Producer_helloworld {
private static final String QUEUE_NAME="queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1获取连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2设置参数
factory.setHost("127.0.0.1");//主机号
factory.setPort(5672);//端口号
factory.setVirtualHost("/test");//队列虚拟机
factory.setUsername("test");//用户名
factory.setPassword("test");//密码
//3创建连接Connection
Connection connection = factory.newConnection();
//4创建Channel
Channel channel = connection.createChannel();
//5创建队列
/*
queueDeclare(
String queue, 队列名称
boolean durable, 是否持久化到本地
boolean exclusive, (1)是否独占连接,只能有一个消费者监听这个队列(2)当connection关闭时,是否删除队列
boolean autoDelete, 是否自动删除,没有消费之自动删除
Map<String, Object> arguments) 配置参数
* */
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
/*
basicPublish(
String exchange, 交换机名称,简单模式使用默认交换机,空字符串即为默认交换机
String routingKey, 路由名称
BasicProperties props, 配置信息
byte[] body 发送的消息数据
)
*/
String body="傻逼java面试官";
//6发送消息
for (int i = 0; i < 100; i++) {
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
}
//7释放资源
channel.close();
connection.close();
}
}
consumer1/consumer2:
public class Hello01 {/// Hello02
private static final String QUEUE_NAME="queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1获取连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2设置参数
factory.setHost("127.0.0.1");//主机号
factory.setPort(5672);//端口号
factory.setVirtualHost("/test");//队列虚拟机
factory.setUsername("test");//用户名
factory.setPassword("test");//密码
//3创建连接Connection
Connection connection = factory.newConnection();
//4创建Channel
Channel channel = connection.createChannel();
//5创建队列
/*
queueDeclare(
String queue, 队列名称
boolean durable, 是否持久化到本地
boolean exclusive, (1)是否独占连接,只能有一个消费者监听这个队列(2)当connection关闭时,是否删除队列
boolean autoDelete, 是否自动删除,没有消费之自动删除
Map<String, Object> arguments) 配置参数
* */
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
/*
basicConsume(
String queue, 队列名
boolean autoAck, 是否自动确认
Consumer callback 回调函数
)
*/
Consumer consumer = new DefaultConsumer(channel){
int number=0;
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
number++;
System.out.println(number+(new String(body)));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
运行过程及结果:
首先开启两个consumer消费者,此时该队列无消息,启动完成后再启动producer生产者发送消息,
此时两个消费者竞争获取消息
3.Pub/Sub订阅模式
p(producer): 生产者
x(exchange): 交换机,接受生产者发送的消息,知道如何处理消息,例如交给某个特定的消息队列,递交给所有队列......
exchange三种模式:
Fanout: 广播,将消息发送到所有绑定交换机的队列
Direct: 定向,把消息发哦是那个到指定的routing key的队列
Topic: 通配符,把消息交给符合routing pattern(路由模式)的队列
queue: 消息对列
c1/c2(consumer): 消费者
producer(生产者):
public class Producer_PubSub {
private static final String QUEUE_NAME1="pubsub_queue1";
private static final String QUEUE_NAME2="pubsub_queue2";
private static final String EXCHANGE_NAME="fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
//获取新连接
Connection connection = factory.newConnection();
//创建channel通道
Channel channel = connection.createChannel();
//创建Exchange交换机
/*
exchangeDeclare(
String exchange, 交换机名称
BuiltinExchangeType type, 交换机类型
type:
DIRECT("direct"), 定向
FANOUT("fanout"), 广播,发送到每一与之绑定的队列
TOPIC("topic"), 通配符模式
HEADERS("headers"); 参数匹配
boolean durable, 是否持久化
boolean autoDelete, 是否自动删除
boolean internal, 内部(erlang语言内部开发),一般置为false
Map<String, Object> arguments)参数.置为null
* */
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, false, null);
//创建队列
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
/*
queueBind(
String queue, 队列名称
String exchange, 交换机名称
String routingKey 路由键,绑定队则
如果交换机类型是fanout,那么routingKey设置为"",发送到所有绑定的队列
)
* */
//队列绑定交换机
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
String body="日志:张三调用了findAll方法,日志级别:info....";
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",null,body.getBytes());
//释放连接
channel.close();
connection.close();
}
}
consumer01/consumer02:
public class consumer01 {
private static final String QUEUE_NAME1="pubsub_queue1";//private static final String QUEUE_NAME2="pubsub_queue2";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
//获取channel
Channel channel = connection.createChannel();
//获取消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.
BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
channel.basicConsume(QUEUE_NAME1,true,consumer);//channel.basicConsume(QUEUE_NAME2,true,consumer);
}
}
4.Routing模式(路由模式)
p(produmer): 生产者
x(exchange): 交换机
type=direct 交换机类型
Q1/Q2(queue): 队列
C1/C2(consumer): 消费者
producer(生产者):
public class RoutingProducer {
private static final String QUEUE_ROUTING1 = "queue_routing1";
private static final String QUEUE_ROUTING2 = "queue_routing2";
private static final String EXCHANGE_NAME = "exchange_Routing";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("test");
factory.setVirtualHost("/test");
factory.setPassword("test");
Connection connection = factory.newConnection();
//获取连接connection
Channel channel = connection.createChannel();
//创建队列queue
channel.queueDeclare(QUEUE_ROUTING1,true,false,false,null);
channel.queueDeclare(QUEUE_ROUTING2,true,false,false,null);
//创建交换机exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true,false,false,null);
//消息主体
String err_body="error: 数据插入错误error...";
String info_body="info: 初始化.....";
String warning_body="warning: 加载中....";
//队列与交换机绑定,queueBind方法的第三个参数为routing key,指定路由模式
channel.queueBind(QUEUE_ROUTING1,EXCHANGE_NAME,"error",null);
channel.queueBind(QUEUE_ROUTING2,EXCHANGE_NAME,"info",null);
channel.queueBind(QUEUE_ROUTING2,EXCHANGE_NAME,"warning",null);
channel.queueBind(QUEUE_ROUTING2,EXCHANGE_NAME,"error",null);
//发送消息
channel.basicPublish(EXCHANGE_NAME,"info",null,info_body.getBytes());
channel.basicPublish(EXCHANGE_NAME,"error",null,err_body.getBytes());
channel.basicPublish(EXCHANGE_NAME,"warning",null,warning_body.getBytes());
//关闭资源
channel.close();
connection.close();
}
}
consumer01(消费者):
public class Consumer_Routing_Err {
private static final String QUEUE_ROUTING1 = "queue_routing1";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
channel.basicConsume(QUEUE_ROUTING1,true,consumer);
}
}
consumer02(消费者):
public class Consumer_Routing_Err {
private static final String QUEUE_ROUTING1 = "queue_routing1";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
channel.basicConsume(QUEUE_ROUTING1,true,consumer);
}
}
5.Topics通配符模式
p(producer): 生产者
X(exchange): 交换机
type: topics
Q1/Q2(queue): 队列
C1/C2(consumer): 消费者
topics中:
*通配符: 一个单词
#通配符: 一个或多个单词
producer:
public class Producer_Topics {
private static final String QUEUE1="Queue_Topics1";
private static final String QUEUE2="Queue_Topics2";
private static final String EXCHANGE="exchange_topics";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
//获取连接
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE1,true,false,false,null);
channel.queueDeclare(QUEUE2,true,false,false,null);
//创建交换机
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
//交换机绑定队列
channel.queueBind(QUEUE1,EXCHANGE,"*.orange.*");
channel.queueBind(QUEUE2,EXCHANGE,"*.*.rabbite");
channel.queueBind(QUEUE2,EXCHANGE,"lazy.#");
//发送消息
String orange_body="orange_body: orange_body.....";
String rabbite_body="rabbite: rabbite.....";
String lazy_body="lazy: lazy......";
channel.basicPublish(EXCHANGE,"xxx.orange.xxx",null,orange_body.getBytes());
channel.basicPublish(EXCHANGE,"xxx.xxx.rabbite",null,rabbite_body.getBytes());
channel.basicPublish(EXCHANGE,"lazy.xxx",null,lazy_body.getBytes());
//关闭资源
channel.close();
connection.close();
}
}
consumer01:
public class Consumer01 {
private static final String QUEUE1="Queue_Topics1";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
//获取连接
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建consumer
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
//获取消息
channel.basicConsume(QUEUE1,true,consumer);
}
}
consumer02:
public class Consumer02 {
private static final String QUEUE2="Queue_Topics2";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("test");
factory.setPassword("test");
//获取连接
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建consumer
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
//获取消息
channel.basicConsume(QUEUE2,true,consumer);
}
}