---------Maven依赖---------
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
---------消息生产---------
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class TestSend
{
private final static String QUEUE_NAME = "testdurable";
private final static String QUEUE_IP = "你的服务器IP或域名";
private final static int QUEUE_PORT = 5672;//RabbitMQ对外服务端口
private final static String QUEUE_USER = "testuser";
private final static String QUEUE_PWD = "123456";
public static void main(String[] argv) throws java.io.IOException, TimeoutException
{
/**
* 创建连接连接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost(QUEUE_IP);
factory.setPort(QUEUE_PORT);// MQ端口
factory.setUsername(QUEUE_USER);// MQ用户名
factory.setPassword(QUEUE_PWD);// MQ密码
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//如QUEUE_NAME是一个transient的queue,第二个参数必须是false;重启rabbit后QUEUE_NAME会被删除掉
//如QUEUE_NAME是一个durability的queue,第二个参数必须是true;重启rabbit后QUEUE_NAME不会被删除掉
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//发送的消息
String message = "hello world!";
//往队列中发出一条消息
int j=0;
Long start = System.currentTimeMillis();
for(int i=j;i<j+10000;i++)
{
//将消息保存起来,重启rabbit后待消费的消息不会被删除
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());
//不保存消息,重启rabbit后待消费的消息都将丢失
//channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
}
System.out.println("发送完成:"+(System.currentTimeMillis() - start));
//关闭频道和连接
channel.close();
connection.close();
}
}
---------消息消费---------
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class TestRead
{
private final static String QUEUE_NAME = "testdurable";
private final static String QUEUE_IP = "你的服务器IP或域名";
private final static int QUEUE_PORT = 5672;//RabbitMQ对外服务端口
private final static String QUEUE_USER = "testuser";
private final static String QUEUE_PWD = "123456";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException
{
//打开连接和创建频道,与发送端一样
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(QUEUE_IP);
factory.setPort(QUEUE_PORT);// MQ端口
factory.setUsername(QUEUE_USER);// MQ用户名
factory.setPassword(QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Waiting for messages. To exit press CTRL+C");
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消费队列
channel.basicConsume(QUEUE_NAME, true, consumer);
Long start = System.currentTimeMillis();
while (true)
{
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received '" + message + "' "+(System.currentTimeMillis() - start));
}
}
}