一、队列
1.1 队列生产者
1.1.1 pom.xml文件
依赖:
<dependencies>
<!--springboot和activemq整合-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
1.1.2 application.yml文件
# 端口
server:
port: 8081
spring:
activemq:
broker-url: tcp://192.168.10.100:61616
user: admin
password: admin
jms:
pub-sub-domain: false # false = queue(不写默认) true = topic
# 自定义队列名称
myqueue: boot-activemq-queue
1.1.3 配置Bean
@Component
@EnableJms //开启jms注解
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
@Bean
public Queue queue(){
return new ActiveMQQueue(myQueue);
}
}
1.1.4 QueueProducer生产者
@Component
public class QueueProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg(){
jmsMessagingTemplate.convertAndSend(queue,"***生产者:"+ UUID.randomUUID().toString().substring(0,7));
}
}
1.1.5 主启动类MainAppProducer
@SpringBootApplication
public class ActiveMq05SpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(ActiveMq05SpringbootApplication.class, args);
}
}
1.1.6 测试
@SpringBootTest(classes = ActiveMq05SpringbootApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class ActiveMq05SpringbootApplicationTests {
@Resource // 这个是java 的注解,而Autowried 是 spring 的
private QueueProducer queue_produce ;
@Test
public void testSend() throws Exception{
queue_produce.produceMessage();
}
}
1.1.7 新需求
每个3秒往MQ推送消息,以下定时发送Case,案列修改
- 修改QueueProducer,新增定时投递方法
@Component
public class QueueProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate ;
@Autowired
private Queue queue ;
// 调用一次一个信息发出
public void produceMessage(){
jmsMessagingTemplate.convertAndSend(queue,"springboot-mq-生产者:"+ UUID.randomUUID().toString().substring(0,6));
}
//间隔时间3秒定投
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled(){
jmsMessagingTemplate.convertAndSend(queue,"定时发送:"+UUID.randomUUID().toString().substring(0,6));
}
}
- 修改主启动类
@SpringBootApplication
@EnableScheduling
public class ActiveMq05SpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(ActiveMq05SpringbootApplication.class, args);
}
}
- 直接开启主启动类,间隔发消息OK
1.2 队列消费者
1.2.1 pom.xml文件
依赖:
<dependencies>
<!--springboot和activemq整合-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
1.2.2 application.yml文件
server:
port: 8888
spring:
activemq:
broker-url: tcp://192.168.10.100:61616
user: admin
password: admin
jms:
pub-sub-domain: false # false = queue true = topic
# 自定义队列名称
myqueue: boot-activemq-queue
1.2.3 QueueConsumer类
@Component
public class QueueConsumer {
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("====>消费者收到消息:"+textMessage.getText());
}
}
1.2.4 主启动类
@SpringBootApplication
public class ActiveMq05SpringbootConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ActiveMq05SpringbootConsumerApplication.class, args);
}
}
二、发布订阅
2.1 Topic生产者
2.1.1 pom.xml文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.1.2 application.yml文件
server:
port: 6666
spring:
activemq:
broker-url: tcp://192.168.10.100:61616
user: admin
password: admin
jms:
pub-sub-domain: true # false = queue true = topic
# 自定义队列名称
myTopic: boot-activemq-topic
2.1.3 配置Bean
@Component
public class ConfigBean {
@Value("${myTopic}")
private String topicName;
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
}
2.1.4 TopicProducer
@Component
public class TopicProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay = 3000)
public void produceTopic(){
jmsMessagingTemplate.convertAndSend(topic,"主题消息:"+ UUID.randomUUID().toString().substring(0,6));
}
}
2.1.5 主启动类
@SpringBootApplication
@EnableScheduling
public class ActiveMq06BootTopicProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ActiveMq06BootTopicProducerApplication.class, args);
}
}
2.2 Topic消费者
2.2.1 pom.xml文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.2.2 application.yml文件
server:
port: 5555
spring:
activemq:
broker-url: tcp://192.168.10.100:61616
user: admin
password: admin
jms:
pub-sub-domain: true # false = queue true = topic
# 自定义队列名称
myTopic: boot-activemq-topic
2.2.3 TopicConsumer
@Component
public class TopicConsumer {
@JmsListener(destination = "${myTopic}")
public void receive(TextMessage textMessage){
try {
System.out.println("消费者收到订阅主题:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2.2.4 主启动类
@SpringBootApplication
public class ActiveMq06BootTopicConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ActiveMq06BootTopicConsumerApplication.class, args);
}
}