Rabbit MQ 直连交换机代码样例
Consumer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
public static void main(String[] args) throws Exception {
//1 创建一个ConnectionFactory, 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置ip
connectionFactory.setHost("127.0.0.1");
//设置端口
connectionFactory.setPort(5672);
//设置虚拟主机路径
connectionFactory.setVirtualHost("/");
//相关于mq集群网络闪断重连的设置
//是否重连
connectionFactory.setAutomaticRecoveryEnabled(true);
//重连间隔时间 每3秒钟重连一次
connectionFactory.setNetworkRecoveryInterval(3000);
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 声明
//交换机名称
String exchangeName = "test_direct_exchange";
//类型
String exchangeType = "direct";
//队列名称
String queueName = "test_direct_queue";
//路由名称
String routingKey = "test.direct";
//表示声明了一个交换机
/**
* 1、exchangeName 交换机名称
* 2、exchangeType交换机类型
* 3、durable 是否持久化
* 4、autoDelete 是否自动删除
* 5、internal 为false
* 6、扩展参数为null
*/
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示声明了一个队列
/**
* 1、queueName 队列名字
* 2、durable 是否持久化 如果设置为true 则表示持久化 就算服务器重启此队列也不会消失
* 3、exclusive 表示独占 设置为true后 则表示此队列只有这一个channel能够监听,应用于顺序消费的场景
* 4、autoDelete 自动删除 设置成ture后,如果队列长时间没有被使用,则会自动删除
* 5、扩展参数 一般为null
*/
channel.queueDeclare(queueName, false, false, false, null);
//建立一个绑定关系:通过路由名称将队列和交换机一一绑定
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
Producer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct111";
//5 发送
String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}