SptingBoot集成ActiveMQ

SptingBoot集成ActiveMQ

ActiveMQ安装

使用docker安装ActiveMQ

sudo docker pull rmohr/activemq
sudo docker run -d --name activeMQ -p 9001:61616 -p 9002:61617 -p 9003:61614 -p 9004:1883 -p 9005:8161 rmohr/activemq
# 端口映射自行改动,8161客户端连接、9001连接端口

安装完成可通过ip:9005访问ActiveMQ客户端

SpringBoot集成

pom.xml

        <!--activeMq-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

Queue

Product

public class ProductMain {

    private static final String QUEUE_URL = "tcp://192.168.1.40:9001";

    private static final String QUEUE_NAME = "TEST";

    public static void main(String[] args) throws JMSException {

        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(QUEUE_URL);
        // JMS 客户端到JMS Provider 的连接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一个发送或接收消息的线程
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息发送给谁.
        // 获取session注意参数值my-queue是Query的名字
        Destination destination = session.createQueue(QUEUE_NAME);
        // MessageProducer:消息生产者
        MessageProducer producer = session.createProducer(destination);
        // 设置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        // 发送一条消息
        for (int i = 1; i <= 5; i++) {
            sendMsg(session, producer, i);
        }
        session.commit();
        connection.close();

    }

    /**
     * 在指定的会话上,通过指定的消息生产者发出一条消息
     *
     * @param session  消息会话
     * @param producer 消息生产者
     */
    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        // 创建一条文本消息
        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
        // 通过消息生产者发出消息
        producer.send(message);

    }
}

Receiver

public class ReceiverMain {

    public static void main(String[] args) throws JMSException {

        //Jms连接Mqtt
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.40:9001");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("test");
        MessageConsumer consumer = session.createConsumer(destination);
        while (true) {
            TextMessage message = (TextMessage) consumer.receive();
            if (!StringUtils.isEmpty(message)) {
                System.out.println("收到消息:" + message.getText());
                session.commit();
            } else {
                break;
            }
        }
        session.close();
        connection.close();
    }
}

Topic

Send

public class Send {

    private static final String QUEUE_URL = "tcp://192.168.1.40:9001";

    private static String TOPIC_NAME = "test-stefanie";

    public static void main(String[] args) throws JMSException {
        start();
    }

    private static void start() throws JMSException {
        log.info("==========生产者已启动==========");
        // 创建连接工厂,采用默认的用户密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(QUEUE_URL);
        // 连接访问
        Connection connection = activeMQConnectionFactory.createConnection();
        // 创建session会话
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);
        //事务
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
        send(producer, session);
        log.info("==========发送成功==========");
        connection.close();
    }

    private static void send(MessageProducer producer, Session session) throws JMSException {
        for (int i = 1; i <= 5; i++) {
            log.info("==========我是消息{}==========", i);
            TextMessage textMessage = session.createTextMessage("我是消息" + i);
            Destination destination = session.createTopic(TOPIC_NAME);
            producer.send(destination, textMessage);
        }
    }
}

Receiver

public class Receiver {

    private static final String QUEUE_URL = "tcp://192.168.1.40:9001";

    private static String TOPIC_NAME = "test-stefanie";

    public static void main(String[] args) throws JMSException {
        start();
    }

    private static void start() throws JMSException {
        log.info("==========消费者启动==========");

        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(QUEUE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("zzz");
        // 不开消息启事物,消息主要发送消费者,则表示消息已经签收
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Topic topicTest = session.createTopic(TOPIC_NAME);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topicTest, "remark");
        connection.start();
        Message message = topicSubscriber.receive();
//        方法一:
        while (true) {
            if (message != null) {
                TextMessage textMessage = (TextMessage) message;
                log.info("==========接收到消息{}==========", textMessage.getText());
            } else {
                break;
            }
        }
//        方法二:监听器
//		  MessageConsumer consumer = session.createConsumer(topicTest);
//        consumer.setMessageListener(new MessageListener() {
//            @Override
//            public void onMessage(Message message) {
//                if (null != message && message instanceof TextMessage) {
//                    TextMessage textMessage = (TextMessage)message;
//                }
//            }
//        });
//		consumer,close();
        session.close();
        connection.close();
    }

上一篇:kafka一些基本原理


下一篇:如何让消息队列达到最大吞吐量?