SptingBoot集成ActiveMQ
ActiveMQ安装
使用docker安装ActiveMQ
sudo docker pull rmohr/activemq
sudo docker run -d --name activeMQ -p 9001:61616 -p 9002:61617 -p 9003:61614 -p 9004:1883 -p 9005:8161 rmohr/activemq
# 端口映射自行改动,8161客户端连接、9001连接端口
安装完成可通过ip:9005访问ActiveMQ客户端
SpringBoot集成
pom.xml
<!--activeMq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
Queue
Product
public class ProductMain {
private static final String QUEUE_URL = "tcp://192.168.1.40:9001";
private static final String QUEUE_NAME = "TEST";
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(QUEUE_URL);
// JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Destination destination = session.createQueue(QUEUE_NAME);
// MessageProducer:消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 发送一条消息
for (int i = 1; i <= 5; i++) {
sendMsg(session, producer, i);
}
session.commit();
connection.close();
}
/**
* 在指定的会话上,通过指定的消息生产者发出一条消息
*
* @param session 消息会话
* @param producer 消息生产者
*/
public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
// 创建一条文本消息
TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
// 通过消息生产者发出消息
producer.send(message);
}
}
Receiver
public class ReceiverMain {
public static void main(String[] args) throws JMSException {
//Jms连接Mqtt
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.40:9001");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (!StringUtils.isEmpty(message)) {
System.out.println("收到消息:" + message.getText());
session.commit();
} else {
break;
}
}
session.close();
connection.close();
}
}
Topic
Send
public class Send {
private static final String QUEUE_URL = "tcp://192.168.1.40:9001";
private static String TOPIC_NAME = "test-stefanie";
public static void main(String[] args) throws JMSException {
start();
}
private static void start() throws JMSException {
log.info("==========生产者已启动==========");
// 创建连接工厂,采用默认的用户密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(QUEUE_URL);
// 连接访问
Connection connection = activeMQConnectionFactory.createConnection();
// 创建session会话
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
//事务
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
send(producer, session);
log.info("==========发送成功==========");
connection.close();
}
private static void send(MessageProducer producer, Session session) throws JMSException {
for (int i = 1; i <= 5; i++) {
log.info("==========我是消息{}==========", i);
TextMessage textMessage = session.createTextMessage("我是消息" + i);
Destination destination = session.createTopic(TOPIC_NAME);
producer.send(destination, textMessage);
}
}
}
Receiver
public class Receiver {
private static final String QUEUE_URL = "tcp://192.168.1.40:9001";
private static String TOPIC_NAME = "test-stefanie";
public static void main(String[] args) throws JMSException {
start();
}
private static void start() throws JMSException {
log.info("==========消费者启动==========");
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(QUEUE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("zzz");
// 不开消息启事物,消息主要发送消费者,则表示消息已经签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topicTest = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topicTest, "remark");
connection.start();
Message message = topicSubscriber.receive();
// 方法一:
while (true) {
if (message != null) {
TextMessage textMessage = (TextMessage) message;
log.info("==========接收到消息{}==========", textMessage.getText());
} else {
break;
}
}
// 方法二:监听器
// MessageConsumer consumer = session.createConsumer(topicTest);
// consumer.setMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message) {
// if (null != message && message instanceof TextMessage) {
// TextMessage textMessage = (TextMessage)message;
// }
// }
// });
// consumer,close();
session.close();
connection.close();
}