消息中间件-RabbitMq(搭建&消息分发)

消息中间件-RabbitMq(搭建&消息分发)

RabbitMq】是一个【AMQP】协议的实现。服务端使用的是Erlang语言进行编写,那也就是说,我们要运行它,就要安装相关Erlang环境。前面说了AMQP最初是为了解决金融行业的可用性问题,所以Rabbit在高可用方面表现不俗,并且在我看来他是这几种中间件中最容易上手的一个。而且它在并发方面表现十分出色,可以实现大概10w的吞吐量。他的特点是:【可靠性、消息集群、高可用、插件机制(可以让它支持别的协议)、支持多语言客户端、管理页面 so on本篇主要聊聊如何安装、使用、以及关于他的一些名词方面的阐述。run。。

安装运行

  • 我的环境是CentOS7
  • http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本,前面是Rabbit的版本,后面是Erlang的对它支持的版本。这里前后要对应下载,版本必须符合他的要求,我这里使用的就是第一个。
  • https://github.com/rabbitmq/erlang-rpm/releases 中复制对应的版本erlang下载地址
  • https://github.com/rabbitmq/rabbitmq-server/tags 中复制对应的版本rabbitmq的下载地址

消息中间件-RabbitMq(搭建&消息分发)

  • 下载Erlang
    • wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v23.3.4.3/erlang-23.3.4.3-1.el7.x86_64.rpm
  • 安装Erlang
    • sudo rpm -Uvh /home/download/erlang-23.3.4.3-1.el7.x86_64.rpm
  • 安装socat
    • sudo yum install -y socat
  • 下载RabbitMQ
    •  wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-server-3.8.15-1.el7.noarch.rpm
  • 安装RabbitMQ
    • 【sudo rpm -Uvh /home/download/rabbitmq-server-3.8.15-1.el7.noarch.rpm】

到目前为止我们的准备工作完毕,以下是一些启动和关闭命令。

停止服务】:sudo systemctl stop rabbitmq-server 【查询状态】:sudo systemctl status rabbitmq-server 【启动】:sudo systemctl start rabbitmq-server 【设置开启自启】:sudo systemctl enable rabbitmq-server 

使用启动命令启动后,我们查询状态发现状态为 dead,这是因为我们要启动他的插件 使用【rabbitmq-plugins list】可以查询所有他支持的插件,我们这里需要启动

【rabbitmq-plugins  enable rabbitmq_management】

消息中间件-RabbitMq(搭建&消息分发)

 执行完成后使用【 cat /etc/rabbitmq/enabled_plugins】就可以知道是否启动插件成功,然后再次启动发现启动状态就为running,使用【netstat -nplt | grep 15672 】发现他的专用端口已经开启,至此,安装启动完毕。这个时候就可以对它进行访问了(你的ip:15672),出现下面的图,就证明搭建成功。这里注意开放一下端口,否则别的机器无法访问:

  • sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
  • sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
  • sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
  • sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent 

消息中间件-RabbitMq(搭建&消息分发)

 然而我们用他自己的gust是无法login in 进去的,因为这个支持在搭建的服务器本身*问,那我们就要创建自己的用户,并且赋予相应的权限。

  • 【添加一个admin用户】:rabbitmqctl add_user admin admin 
  • 【分配操作权限】:rabbitmqctl set_user_tags admin administrator
  • 【分配资源权限】:rabbitmqctl set_permissions -p / admin ".*" ".*" ".*

使用admin进行登录,至此,可以rabbitmq可以正常使用

消息中间件-RabbitMq(搭建&消息分发)

使用

添加相关依赖

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.5.1</version>
 </dependency>

生产一个消息

消息中间件-RabbitMq(搭建&消息分发)
public class Producer {

    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接属性
        factory.setHost("你的ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3、从连接工厂获取连接
            connection = factory.newConnection();

            // 4、从链接中创建通道
            channel = connection.createChannel();

            /**
             * 5、声明(创建)队列
             * 如果队列不存在,才会创建
             * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
             *
             * queueDeclare参数说明:
             * @param queue 队列名称
             * @param durable 队列是否持久化
             * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
             * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
             * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
             */
            channel.queueDeclare("queue1", false, false, false, null);
            // 消息内容
            String message = "Hello World!";
            // 6、发送消息
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息已发送!");

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 7、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 8、关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

 

生产后你去管理页面查询,会发现一个消息还未读取。

消息中间件-RabbitMq(搭建&消息分发)

消费一个消息(消费后再次查询,发现ready中没有东西了)

消息中间件-RabbitMq(搭建&消息分发)
public class Consumer {

    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接属性
        factory.setHost("你的ip");
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3、从连接工厂获取连接
            connection = factory.newConnection("消费者");

            // 4、从链接中创建通道
            channel = connection.createChannel();

            channel.queueDeclare("queue1", false, false, false, null);

            // 6、定义收到消息后的回调
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
                }
            };
            // 7、监听队列
            channel.basicConsume("queue1", true, callback, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                }
            });

            System.out.println("开始接收消息");
            System.in.read();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 8、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 9、关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

至此,简单使用结束。

Rabbit名词介绍

Blocker:一个rabbit服务器就是一个Blocker

虚拟主机(virtual host一个Blocker中可以有多个虚拟机,每个虚拟机类似于一个工作空间,每个虚拟主机中的消息和其他虚拟主机的消息不相互影响

connection:消费者和rabbit中间的连接,有了这个连接,双方才能通信。

RoutingKey:消息被发给交换机的时候,会携带它,这个是用来指定消息的路由规则(可以为空)

channel(信道):是在connection上建立的管道,一个connection上可以建立多个channel,消息通过他们进行传递。

BindingKey:Exchange和Queue绑定的关系,Exchange接收到的消息会带有RoutingKey这个字段。

交换机(exchanger):当rabbit接收到消息后,交换机对这些消息进行转换,他的类型决定哪个队列中应该拥有这些消息,

交换机类型:

  • 【direct】:当发送消息的时候,我们会在消息体上携带一个路由键【routekey】,如果消息体上你的路由键和队列匹配则发送给对应的队列。
  • 【fanout 】:发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
  • 【headers】:根据发送的消息内容中的【headers】属性进行匹配,当消息发送到RabbitMQ时会取到该消息的【headers】与Exchange绑定时指定的键值对进行匹配,如果匹配到,则对应队列可以接受到消息。
  • 【topic】:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”会匹配一个或多个词,比如【ok.#】--》【ok.1.1 or ok.1.1.2 so on】,只要队列可以匹配到,就可以接受消息

队列(queue):rabbit接收到的信息存储在这里,消费者也是从这里获取的消息。

binder: 队列和交换机之间的绑定

AMQP(advanced message queuing protocol):

他是应用层协议的一个开放标准,为面向消息的中间件协议。他分为三层:

【底层协议层】:主要传输二进制数据流,

【中间层】:将客户端的命令转发给服务器,然后将服务器的回复转给客户端。【将最高层的用户层传递的信息转化为二进制,传递给底层。把底层的信息转化为客户端可以知道的语言。】

【最高层】:提供用户调用的命令。

流转流程

生产者:建立连接->开启通道->发送消息->关闭资源

消费者:建立连接->开启通道->接受消息->发送确认消息(告诉rabbit,rabbit修改消息状态为已经读 and so on)->释放资源

消息中间件-RabbitMq(搭建&消息分发)

消息中间件-RabbitMq(搭建&消息分发)

上一篇:C3P0数据库连接池使用


下一篇:C# sql语句拼接时 like情况的防sql注入的用法