消息中间件-RabbitMq(搭建&消息分发)
【RabbitMq】是一个【AMQP】协议的实现。服务端使用的是Erlang语言进行编写,那也就是说,我们要运行它,就要安装相关Erlang环境。前面说了AMQP最初是为了解决金融行业的可用性问题,所以Rabbit在高可用方面表现不俗,并且在我看来他是这几种中间件中最容易上手的一个。而且它在并发方面表现十分出色,可以实现大概10w的吞吐量。他的特点是:【可靠性、消息集群、高可用、插件机制(可以让它支持别的协议)、支持多语言客户端、管理页面 so on】本篇主要聊聊如何安装、使用、以及关于他的一些名词方面的阐述。run。。
安装运行
- 我的环境是CentOS7
- http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本,前面是Rabbit的版本,后面是Erlang的对它支持的版本。这里前后要对应下载,版本必须符合他的要求,我这里使用的就是第一个。
- https://github.com/rabbitmq/erlang-rpm/releases 中复制对应的版本erlang下载地址
- https://github.com/rabbitmq/rabbitmq-server/tags 中复制对应的版本rabbitmq的下载地址
- 下载Erlang
- 【wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v23.3.4.3/erlang-23.3.4.3-1.el7.x86_64.rpm】
- 安装Erlang
- 【sudo rpm -Uvh /home/download/erlang-23.3.4.3-1.el7.x86_64.rpm】
- 安装socat
- 【sudo yum install -y socat】
- 下载RabbitMQ
- 【wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-server-3.8.15-1.el7.noarch.rpm】
- 安装RabbitMQ
- 【sudo rpm -Uvh /home/download/rabbitmq-server-3.8.15-1.el7.noarch.rpm】
到目前为止我们的准备工作完毕,以下是一些启动和关闭命令。
【停止服务】:sudo systemctl stop rabbitmq-server 【查询状态】:sudo systemctl status rabbitmq-server 【启动】:sudo systemctl start rabbitmq-server 【设置开启自启】:sudo systemctl enable rabbitmq-server
使用启动命令启动后,我们查询状态发现状态为 dead,这是因为我们要启动他的插件 使用【rabbitmq-plugins list】可以查询所有他支持的插件,我们这里需要启动
【rabbitmq-plugins enable rabbitmq_management】
执行完成后使用【 cat /etc/rabbitmq/enabled_plugins】就可以知道是否启动插件成功,然后再次启动发现启动状态就为running,使用【netstat -nplt | grep 15672 】发现他的专用端口已经开启,至此,安装启动完毕。这个时候就可以对它进行访问了(你的ip:15672),出现下面的图,就证明搭建成功。这里注意开放一下端口,否则别的机器无法访问:
- sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
- sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
- sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
- sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
然而我们用他自己的gust是无法login in 进去的,因为这个支持在搭建的服务器本身*问,那我们就要创建自己的用户,并且赋予相应的权限。
- 【添加一个admin用户】:rabbitmqctl add_user admin admin
- 【分配操作权限】:rabbitmqctl set_user_tags admin administrator
- 【分配资源权限】:rabbitmqctl set_permissions -p / admin ".*" ".*" ".*
使用admin进行登录,至此,可以rabbitmq可以正常使用
使用
添加相关依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.1</version> </dependency>生产一个消息
View Codepublic class Producer { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("你的ip"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3、从连接工厂获取连接 connection = factory.newConnection(); // 4、从链接中创建通道 channel = connection.createChannel(); /** * 5、声明(创建)队列 * 如果队列不存在,才会创建 * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错 * * queueDeclare参数说明: * @param queue 队列名称 * @param durable 队列是否持久化 * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制 * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除 * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等 */ channel.queueDeclare("queue1", false, false, false, null); // 消息内容 String message = "Hello World!"; // 6、发送消息 channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("消息已发送!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 7、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8、关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
生产后你去管理页面查询,会发现一个消息还未读取。
消费一个消息(消费后再次查询,发现ready中没有东西了)
View Codepublic class Consumer { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("你的ip"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3、从连接工厂获取连接 connection = factory.newConnection("消费者"); // 4、从链接中创建通道 channel = connection.createChannel(); channel.queueDeclare("queue1", false, false, false, null); // 6、定义收到消息后的回调 DeliverCallback callback = new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息:" + new String(message.getBody(), "UTF-8")); } }; // 7、监听队列 channel.basicConsume("queue1", true, callback, new CancelCallback() { public void handle(String consumerTag) throws IOException { } }); System.out.println("开始接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 8、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 9、关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }至此,简单使用结束。
Rabbit名词介绍
Blocker:一个rabbit服务器就是一个Blocker
虚拟主机(virtual host):一个Blocker中可以有多个虚拟机,每个虚拟机类似于一个工作空间,每个虚拟主机中的消息和其他虚拟主机的消息不相互影响
connection:消费者和rabbit中间的连接,有了这个连接,双方才能通信。
RoutingKey:消息被发给交换机的时候,会携带它,这个是用来指定消息的路由规则(可以为空)
channel(信道):是在connection上建立的管道,一个connection上可以建立多个channel,消息通过他们进行传递。
BindingKey:Exchange和Queue绑定的关系,Exchange接收到的消息会带有RoutingKey这个字段。
交换机(exchanger):当rabbit接收到消息后,交换机对这些消息进行转换,他的类型决定哪个队列中应该拥有这些消息,
交换机类型:
- 【direct】:当发送消息的时候,我们会在消息体上携带一个路由键【routekey】,如果消息体上你的路由键和队列匹配则发送给对应的队列。
- 【fanout 】:发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
- 【headers】:根据发送的消息内容中的【headers】属性进行匹配,当消息发送到RabbitMQ时会取到该消息的【headers】与Exchange绑定时指定的键值对进行匹配,如果匹配到,则对应队列可以接受到消息。
- 【topic】:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”会匹配一个或多个词,比如【ok.#】--》【ok.1.1 or ok.1.1.2 so on】,只要队列可以匹配到,就可以接受消息
队列(queue):rabbit接收到的信息存储在这里,消费者也是从这里获取的消息。
binder: 队列和交换机之间的绑定
AMQP(advanced message queuing protocol):
他是应用层协议的一个开放标准,为面向消息的中间件协议。他分为三层:
【底层协议层】:主要传输二进制数据流,
【中间层】:将客户端的命令转发给服务器,然后将服务器的回复转给客户端。【将最高层的用户层传递的信息转化为二进制,传递给底层。把底层的信息转化为客户端可以知道的语言。】
【最高层】:提供用户调用的命令。
流转流程
生产者:建立连接->开启通道->发送消息->关闭资源
消费者:建立连接->开启通道->接受消息->发送确认消息(告诉rabbit,rabbit修改消息状态为已经读 and so on)->释放资源