RabbitMQ-Java版本生产与消费

---------Maven依赖---------
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>


---------消息生产---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;

public class TestSend
{
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = "你的服务器IP或域名";
    private final static int QUEUE_PORT = 5672;//RabbitMQ对外服务端口
    private final static String QUEUE_USER = "testuser";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException, TimeoutException
    {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在主机ip或者主机名
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用户名
        factory.setPassword(QUEUE_PWD);// MQ密码
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建一个频道
        Channel channel = connection.createChannel();
        //指定一个队列
        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         //如QUEUE_NAME是一个transient的queue,第二个参数必须是false;重启rabbit后QUEUE_NAME会被删除掉
         //如QUEUE_NAME是一个durability的queue,第二个参数必须是true;重启rabbit后QUEUE_NAME不会被删除掉
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //发送的消息
        String message = "hello world!";
        //往队列中发出一条消息
        int j=0;
        Long start = System.currentTimeMillis();
        for(int i=j;i<j+10000;i++)
        {
         //将消息保存起来,重启rabbit后待消费的消息不会被删除
         channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());

         //不保存消息,重启rabbit后待消费的消息都将丢失
         //channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
        }
        System.out.println("发送完成:"+(System.currentTimeMillis() - start));
        //关闭频道和连接
        channel.close();
        connection.close();
     }
}

---------消息消费---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class TestRead
{
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = "你的服务器IP或域名";
    private final static int QUEUE_PORT = 5672;//RabbitMQ对外服务端口
    private final static String QUEUE_USER = "testuser";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException
    {
        //打开连接和创建频道,与发送端一样
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用户名
        factory.setPassword(QUEUE_PWD);// MQ密码
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages. To exit press CTRL+C");
        
        //创建队列消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消费队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        Long start = System.currentTimeMillis();
        while (true)
        {
            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("Received '" + message + "'    "+(System.currentTimeMillis() - start));
        }
    }
}

上一篇:SpringData操作MongoDB


下一篇:CentOS7常用环境设置