Rabbit MQ 直连交换机代码样例

Rabbit MQ 直连交换机代码样例

Consumer

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {

	public static void main(String[] args) throws Exception {
		
		
       //1 创建一个ConnectionFactory, 并进行配置
		ConnectionFactory connectionFactory = new ConnectionFactory();
		//设置ip
		connectionFactory.setHost("127.0.0.1");
		//设置端口
		connectionFactory.setPort(5672);
		//设置虚拟主机路径
		connectionFactory.setVirtualHost("/");
		//相关于mq集群网络闪断重连的设置
		//是否重连
		connectionFactory.setAutomaticRecoveryEnabled(true);
		//重连间隔时间   每3秒钟重连一次
		connectionFactory.setNetworkRecoveryInterval(3000);
   
		//2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();  
		//4 声明
		//交换机名称
		String exchangeName = "test_direct_exchange";
		//类型
		String exchangeType = "direct";
		//队列名称
		String queueName = "test_direct_queue";
		//路由名称
		String routingKey = "test.direct";
		
		//表示声明了一个交换机
		/**
		* 1、exchangeName 交换机名称
		* 2、exchangeType交换机类型
		* 3、durable 是否持久化
		* 4、autoDelete 是否自动删除
		* 5、internal 为false
		* 6、扩展参数为null
		*/
		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
		//表示声明了一个队列
		/**
		* 1、queueName 队列名字
		* 2、durable  是否持久化  如果设置为true 则表示持久化  就算服务器重启此队列也不会消失
		* 3、exclusive 表示独占  设置为true后 则表示此队列只有这一个channel能够监听,应用于顺序消费的场景
		* 4、autoDelete  自动删除 设置成ture后,如果队列长时间没有被使用,则会自动删除
		* 5、扩展参数 一般为null
		*/
		channel.queueDeclare(queueName, false, false, false, null);
		//建立一个绑定关系:通过路由名称将队列和交换机一一绑定
		channel.queueBind(queueName, exchangeName, routingKey);
		
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
	}
}

Producer

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

	
	public static void main(String[] args) throws Exception {
		
		//1 创建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("127.0.0.1");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 创建Connection
		Connection connection = connectionFactory.newConnection();
		//3 创建Channel
		Channel channel = connection.createChannel();  
		//4 声明
		String exchangeName = "test_direct_exchange";
		String routingKey = "test.direct111";
		//5 发送
		
		String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
		channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); 		
		
	}
	
}
上一篇:Java多线程从基础到并发模型统统帮你搞定!你还看不明白?


下一篇:常见Message Queue介绍