一:介绍
1.优缺点
简单,但是耦合性较高。
这种模式是生产者与消费者一一对应,就是一个产生者,有一个消费者来消费。
如果,多个消费者想消费一个队列中的消息就不适合了。这种情况在后面会接着介绍。
2.进入官网
进入get start
然后进入Tutorials
发现简单消息队列
二:新建项目
1.新建maven项目
2.pomwenjian
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>mq</groupId>
<artifactId>rabbitmqTest</artifactId>
<version>1.0-SNAPSHOT</version> <dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency> </dependencies> </project>
3.公共类
获取连接
package com.mq.utils; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil {
/**
* 获取connection连接
*/
public static Connection getConnection()throws Exception{
//定义一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//设置AMQP端口
factory.setPort(5672);
//vhost
factory.setVirtualHost("/cjhost");
//用户名
factory.setUsername("caojun");
//密码
factory.setPassword("123456");
//返回连接
return factory.newConnection();
}
}
4.项目结构
三:生产者
1.程序
package com.mq.send; import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class SimpleSend {
private static final String QUENE_NAME="test_simple_queue";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection= ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel=connection.createChannel();
//创建队列声明
channel.queueDeclare(QUENE_NAME,false,false,false,null); //消息
String strBody="Hello Mq"; //发送
channel.basicPublish("",QUENE_NAME,null,strBody.getBytes());
System.out.println("send strBody:"+strBody); //关闭连接
channel.close();
connection.close();
}
}
2.运行
控制台:
管理平台:
3.使用管理平台获取消息
这个时候,队列中的消息就会被消费掉。
四:消费者
1.程序一
这个程序中的API是3.4之前的,现在还能用
package com.mq.receive; import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery; public class SimpleReceive {
private static final String QUENE_NAME="test_simple_queue";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection= ConnectionUtil.getConnection();
//创建通道
Channel channel=connection.createChannel();
//定义消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUENE_NAME,true,consumer);
while (true){
Delivery delivery=consumer.nextDelivery();
String strBody=new String(delivery.getBody());
System.out.println("receive strBody:"+strBody);
}
}
}
2.效果
3.程序二
这个是新的api
package com.mq.receive; import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import com.rabbitmq.client.QueueingConsumer.Delivery; import java.io.IOException; public class SimpleReceive {
private static final String QUENE_NAME = "test_simple_queue"; public static void main(String[] args) throws Exception {
newApi();
}
public static void newApi()throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUENE_NAME,false,false,false,null);
//创建消费者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String strBody=new String(body,"utf-8");
System.out.println("receive new strBody:"+strBody);
}
};
//监听队列
channel.basicConsume(QUENE_NAME,true,consumer);
} /**
* 这个是老的API
* @throws Exception
*/
public static void oldApi() throws Exception {
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUENE_NAME, true, consumer);
while (true) {
Delivery delivery = consumer.nextDelivery();
String strBody = new String(delivery.getBody());
System.out.println("receive strBody:" + strBody);
}
}
}
4.效果