RabbitMQ详解

Maven

<dependencies>
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>3.6.5</version>
		</dependency>
	</dependencies>

RabbitMQ五种队列形式 

1.点对点(简单)的队列

封装Connection

public class MQConnectionUtils {

	public static Connection newConnection() throws IOException, TimeoutException {
		// 1.定义连接工厂
		ConnectionFactory factory = new ConnectionFactory();
		// 2.设置服务器地址
		factory.setHost("127.0.0.1");
		// 3.设置协议端口号
		factory.setPort(5672);
		// 4.设置vhost
		factory.setVirtualHost("/test001_host");
		// 5.设置用户名称
		factory.setUsername("test001");
		// 6.设置用户密码
		factory.setPassword("123456");
		// 7.创建新的连接
		Connection newConnection = factory.newConnection();
		return newConnection;
	}
}

 生产者

public class Producer {
	private static final String QUEUE_NAME = "test_queue";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 1.获取连接
		Connection newConnection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = newConnection.createChannel();
		// 3.创建队列声明
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		String msg = "test_110";
		System.out.println("生产者发送消息:" + msg);
		// 4.发送消息
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		channel.close();
		newConnection.close();
	}

}

消费者 

public class Customer {
	private static final String QUEUE_NAME = "test_queue";

	public static void main(String[] args) throws IOException, TimeoutException {
		System.out.println("002");
		// 1.获取连接
		Connection newConnection = MQConnectionUtils.newConnection();
		// 2.获取通道
		Channel channel = newConnection.createChannel();
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msgString = new String(body, "UTF-8");
				System.out.println("消费者获取消息:" + msgString);
			}
		};
		// 3.监听队列
		channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

	}
}

2.工作(公平性)队列模式

生产者

public class Producer {
	private static final String QUEUE_NAME = "test_queue";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 1.获取连接
		Connection newConnection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = newConnection.createChannel();
		// 3.创建队列声明
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
		for (int i = 1; i <= 50; i++) {
			String msg = "test_" + i;
			System.out.println("生产者发送消息:" + msg);
			// 4.发送消息
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		}
		channel.close();
		newConnection.close();
	}
}

消费者 

public class Customer1 {
	private static final String QUEUE_NAME = "test_queue";
	public static void main(String[] args) throws IOException, TimeoutException {
		System.out.println("001");
		// 1.获取连接
		Connection newConnection = MQConnectionUtils.newConnection();
		// 2.获取通道
		final Channel channel = newConnection.createChannel();
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
		DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msgString = new String(body, "UTF-8");
				System.out.println("消费者获取消息:" + msgString);
				try {
					Thread.sleep(1000);
				} catch (Exception e) {

				} finally {
					// 手动回执消息
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		// 3.监听队列
		channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
	}
}

3.发布订阅模式

该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列 

交换机的四种类型:

Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列

Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列

Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

RabbitMQ详解

 生产者

public class ProducerFanout {
	private static final String EXCHANGE_NAME = "fanout_exchange";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 1.创建新的连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		String msg = "fanout_exchange_msg";
		// 4.发送消息
		channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
		// System.out.println("生产者发送msg:" + msg);
		// // 5.关闭通道、连接
		// channel.close();
		// connection.close();
		// 注意:如果消费没有绑定交换机和队列,则消息会丢失

	}
}

邮件消费者 

public class ConsumerEmailFanout {
	private static final String QUEUE_NAME = "consumerFanout_email";
	private static final String EXCHANGE_NAME = "fanout_exchange";

	public static void main(String[] args) throws IOException, TimeoutException {
		System.out.println("邮件消费者启动");
		// 1.创建新的连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.消费者关联队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body, "UTF-8");
				System.out.println("消费者获取生产者消息:" + msg);
			}
		};
		// 5.消费者监听队列消息
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}
}

短信消费者 

public class ConsumerSMSFanout {
	private static final String QUEUE_NAME = "ConsumerFanout_sms";
	private static final String EXCHANGE_NAME = "fanout_exchange";

	public static void main(String[] args) throws IOException, TimeoutException {
		System.out.println("短信消费者启动");
		// 1.创建新的连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.消费者关联队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body, "UTF-8");
				System.out.println("消费者获取生产者消息:" + msg);
			}
		};
		// 5.消费者监听队列消息
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}
}

4.路由模式Routing

生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要指定路由key(key匹配就能接受消息,key不匹配就不能接受消息)

生产者

public class ProducerDirect {
	private static final String EXCHANGE_NAME = "direct_exchange";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 1.创建新的连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		String routingKey = "info";
		String msg = "direct_exchange_msg" + routingKey;
		// 4.发送消息
		channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
		System.out.println("生产者发送msg:" + msg);
		// // 5.关闭通道、连接
		// channel.close();
		// connection.close();
		// 注意:如果消费没有绑定交换机和队列,则消息会丢失

	}
}

邮件消费者 

public class ConsumerEmailDirect {
	private static final String QUEUE_NAME = "consumer_direct_email";
	private static final String EXCHANGE_NAME = "direct_exchange";

	public static void main(String[] args) throws IOException, TimeoutException {
		System.out.println("邮件消费者启动");
		// 1.创建新的连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.消费者关联队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body, "UTF-8");
				System.out.println("消费者获取生产者消息:" + msg);
			}
		};
		// 5.消费者监听队列消息
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}
}

短信消费者 

public class ConsumerSMSDirect {
	private static final String QUEUE_NAME = "consumer_direct_sms";
	private static final String EXCHANGE_NAME = "direct_exchange";

	public static void main(String[] args) throws IOException, TimeoutException {
		System.out.println("短信消费者启动");
		// 1.创建新的连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.消费者关联队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body, "UTF-8");
				System.out.println("消费者获取生产者消息:" + msg);
			}
		};
		// 5.消费者监听队列消息
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}
}

5.通配符模式Topics

此模式是在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;

符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor

符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor

生产者

public class ProducerDirect {
	private static final String EXCHANGE_NAME = "topic_exchange";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 1.创建新的连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		String routingKey = "log.info.error";
		String msg = "topic_exchange_msg" + routingKey;
		// 4.发送消息
		channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
		System.out.println("生产者发送msg:" + msg);
		// // 5.关闭通道、连接
		channel.close();
		connection.close();
		// 注意:如果消费没有绑定交换机和队列,则消息会丢失
	}
}

邮件消费者

public class ConsumerEmailDirect {
	private static final String QUEUE_NAME = "consumer_topic_email";
	private static final String EXCHANGE_NAME = "topic_exchange";

	public static void main(String[] args) throws IOException, TimeoutException {
		System.out.println("邮件消费者启动");
		// 1.创建新的连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.消费者关联队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
		// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body, "UTF-8");
				System.out.println("消费者获取生产者消息:" + msg);
			}
		};
		// 5.消费者监听队列消息
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}
}

短信服务器

public class ConsumerSMSDirect {
	private static final String QUEUE_NAME = "consumer_topic_sms";
	private static final String EXCHANGE_NAME = "topic_exchange";

	public static void main(String[] args) throws IOException, TimeoutException {
		System.out.println("短信消费者启动");
		// 1.创建新的连接
		Connection connection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = connection.createChannel();
		// 3.消费者关联队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body, "UTF-8");
				System.out.println("消费者获取生产者消息:" + msg);
			}
		};
		// 5.消费者监听队列消息
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}

}

RabbitMQ消息确认机制

   生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。

1.AMQP 事务机制

 txSelect  将当前channel设置为transaction模式

 txCommit  提交当前事务

 txRollback  事务回滚

2.Confirm 模式

就是生产者发送消息后,RabbitMQ服务器会发送一个确认给生产者

上一篇:java排序算法(九):归并排序


下一篇:Android—Gradle教程(四)