2021-11-10

ActiveMQ学习笔记一

文章目录

一、Linux安装ActiveMQ及应用

https://activemq.apache.org/官网下载apache-activemq-5.15.9-bin.tar.gz 用xftp上传到/opt目录解压在bin目录中启动./activemq start ,前提是需要对应jdk版本(我安装是对应需要jdk1.8)

启动命令:
启动 bin下面的命令
./activemq start

默认后台端口61616 前台端口8161
查看是否启动三种方式
1 netstat -anp |grep 61616
2 ps -ef | grep activemq | grep -v grep
3 lsof -i:61616

启动完成后访问路径:
(自己服务器的IP地址)xxx.xxx.xxx.xxx:8161/activemq
默认用户名密码 admin admin

2021-11-10

它有两大模式:queue 和 topic

2021-11-10
两大模式的比较
2021-11-10

二、JMS

组成

JMS 组成结构和特点
provide 服务器
produce 生产者
consumer 消费者
message :消息头,消息体,消息属性

JMS message
消息头:几个重要的设置
JMSDestination 目的地:发送消息的目的地主要指Queue和Topic
JMSDeliveryModel 持久和非持久
JMSExpiration 消息过期设置
JMSPriority 消息优先级
JMSMessageID 消息唯一标志id:唯一识别每个消息的标志由mq产生

2021-11-10
2021-11-10

2021-11-10

消息体 :封装具体的消息数据
5种消息体格式:
TextMessage 普通的字符串包含一个String
MapMessage 一个Map类型消息,key是String 而值是java基本类型
BytesMessage 二进制数组消息,包含一个byte[]
StreamMessage java数据流消息,用标准的流操作来顺序填充和读取
ObjectMess age 对象消息,包含一个可序列化的java对象
发送和接收消息体类型必须一致对应

消息属性:
如果需要除消息头字段以外的值,那么可以使用消息属性
识别/去重/重点标注等操作非常有用的方法
是一对kv对:如 textMessage.setStringProperty(“zs”,“vip”);

消息可靠性 : (持久性 ,事务, 签收)

1 persisent 持久性:messageProducer.setDeliverymodel(DeliveryModel.NON_PERSISENT);//非持久化
DeliveryModel.PERSISENT持久化 默认策略就是持久化
注意:
持久化topic:一定要先运行一次消费者,也就是需要先订阅这个主题,然后再运行生产者发送消息
此时无论消费者是否在线都会接收到消息,不在线的话下次连接时候会把没有接收过的消息都接收回来。
2021-11-10

队列:
1 非持久性:当服务器宕机 ,消息丢失
2 持久性:当服务器宕机,消息依然存在
3 队列默认策略就是持久化

//非持久化演示
生产者

public class JmsProduce {

    public static final String ACTIVEMQ_URL = "tcp://xxx.xxx.xxx.xxx:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        //1 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2 通过连接工厂,获得connection 并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        // 创建会话session
        // 两个参数,第一个叫事务,第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);
        //创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        //设置消息为非持久化
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISENT);
        //messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 持久化
        //通过使用messageProducer生产3条消息发送到mq队列
        for (int i = 1; i <=3; i++) {
            //创建消息
            TextMessage textMessage = session.createTextMessage("message" + i);
            //通过messageProducer生产者发送消息给mq
            messageProducer.send(textMessage);
        }
        //关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("***********MQ消息发送完成");

    }

}

消费者

public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://192.168.24.132:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException {

        System.out.println("******我是2号消费者");

        //1 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2 通过连接工厂,获得connection 并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        // 创建会话session
        // 两个参数,第一个叫事务,第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);

        //创建消息的消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);



        //通过监听的方式来接收消息
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (null != message && message instanceof TextMessage) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("**********MessageListener收到消息"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();

    }

–事务偏生产者,签收偏消费者

2 事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

如果是false:不开启事务,只要执行send就进入到队列中去,那么第二个签收参数设置需要有效
如果是true,开启事务,先执行send再执行commit,消息才被真正提交到队列中,消息需要批量发送需要缓冲区处理

生产者--------如果是false 则send的时候就提交成功到目的地,如果是true,一定需要提交commit,出错可以回滚rollback,如果没有commit那么消息就没有发送到目的地。

消费者 ---------如果设置为false,则消息只被消费一次,如果是true,一定需要设置commit,如果没有写commit那么消息会被重复消费,

3、签收
2021-11-10
非事务下的签收:
1 自动签收(默认)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

2 手动签署 :需要显示的调用acknowledge方法
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
接受完消息后 调用textMessage.acknowledge();
如果没有收到消息后没有签收会出现重复消费问题

3 可重复签收:了解即可

事务下的签收:
生产事务开始,只有commit后才能将全部消费变为已消费
当一个事务被成功提交则消息被自动签收,如果事务回滚,则消息会被再次传送,

事务与签收的关系:事务比签收更大,只要是开始事务,不管有没有显示调用acknowledge方法,只要提交就默认是自动签收
如果开启事务但是没有commit调用,即使写了acknowledge方法签收也不会成功。

非事务会话中,消息何时被确认取决于创建会话时候的应答模式(acknowledgement model),如果自动签收就自动,如果手动签收就需要调用textMessage.acknowledge();

三、总结

队列2021-11-10

topic
2021-11-10
jms

2021-11-10

持久非持久
2021-11-10
2021-11-10

队列持久订阅和非持久订阅

2021-11-10

2021-11-10

上一篇:WebSocket


下一篇:RabbitMQ入门篇