1、pom.xml引用active相关包
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.8</version> </dependency>
2、编写生产者代码
package com.du.jms.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息的提供者 * Created by only-dream on 2019/3/8 19:21 */ public class AppProducer { private static final String url = "tcp://127.0.0.1:61616"; private static final String queueName = "queue-test";//队列名称 public static void main(String[] args) throws JMSException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.创建了连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 /* (第一个参数表示是否在事务中处理,一般为true) 第二个参数表示印打模式,选用的自动印打*/ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建目标 Destination destination = session.createQueue(queueName); //6.创建消息的生产者 MessageProducer messageProducer = session.createProducer(destination); //7创建and发送消息 for (int i = 0; i < 100; i++) { TextMessage textMessage = session.createTextMessage("test" + i); messageProducer.send(textMessage); System.out.println("消息发送成功"+textMessage); } //8.关闭连接 connection.close(); } }
3、编写消费者代码(队列模式)
package com.du.jms.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by only-dream on 2019/3/8 20:24 */ public class AppConsumer { private static final String url = "tcp://127.0.0.1:61616"; private static final String queueName = "queue-test";//队列名称 public static void main(String[] args) throws JMSException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.创建了连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 /* (第一个参数表示是否在事务中处理,一般为true) 第二个参数表示印打模式,选用的自动印打*/ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建目标 Destination destination = session.createQueue(queueName); //6.创建消息的消费者 MessageConsumer consumer = session.createConsumer(destination); //7.创建一个监听器(监听是异步的,需要在监听到消息后再关闭连接) consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.关闭连接 // connection.close(); } }
4、主题模式创建的接收消息的时候只需把
Destination destination = session.createQueue(queueName);
改为
Destination destination = session.createTopic(topicName);
两者的区别是
1.队列消息是平均分配消息,主题消息是获取所有的消息
2.主题消息消费者不能获取未接收之前发布的消息