消息中间件--ActiveMQ&JMS消息服务

### 消息中间件 ###

----------

**消息中间件**

1. 消息中间件的概述
2. 消息中间件的应用场景
  * 异步处理
  * 应用解耦
  * 流量削峰
  * 消息通信
 
----------

### JMS消息服务 ###

----------

**JMS的概述**

1. JMS消息服务的概述
2. JMS消息模型
  * P2P模式
  * Pub/Sub模式
 
3. 消息消费的方式
  * 同步的方式---手动
  * 异步的方式---listener监听
 
4. JMS编程模型
----------

### 消息中间件:ActiveMQ ###

----------

**ActiveMQ的下载与安装**

1. ActiveMQ的下载与安装
  * 下载ActiveMQ的压缩文件,解压apache-activemq-5.14.5-bin.zip文件
  * 双击运行:activemq.bat文件,启动服务
 
2. 测试ActiveMQ是否安装成功
  * 打开浏览器,输入:http://localhost:8161
 
3. 点击Manage ActiveMQ broker连接,可以查看ActiveMQ中已经发布的消息等
  * 用户名密码都是:admin
----------

**ActiveMQ的消息队列方式入门**(P2P模式)

1. 在父工程的pom.xml文件中引入ActiveMQ和Spring整合JMS的坐标依赖
<!-- activemq start -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.2.0</version>
</dependency>
<!-- activemq end -->
<!-- spring 与 mq整合 start -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.7</version>
</dependency>
<!-- spring 与 mq整合 end -->
 
2. ActiveMQ的向消息队列中发送消息的入门程序(没有使用Spring整合JMS的方式)
@Test
public void sendQueueMessage() throws JMSException {
// 1 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); // 2 使用工厂,创建连接
Connection connection = factory.createConnection(); // 3 启动连接
connection.start(); // 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 5 创建队列队形(myQueue--队列的名字)/topic-----------session创建
Queue queue = session.createQueue("myQueue");
// 6 创建生产者-----------session创建
MessageProducer producer = session.createProducer(queue);
// 7 创建消息----文本消息-------session创建
TextMessage message = session.createTextMessage();
message.setText("helloworld!!!"); // 8 发送消息
producer.send(message); // 9 提交事务
session.commit();
session.close();
connection.close();
}
3. ActiveMQ从消息队列中获取消息
@Test
public void receiverQueueMessage() throws JMSException {
// 1 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
// 2 使用工厂,创建连接
Connection connection = factory.createConnection();
// 3 启动连接
connection.start();
// 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 5 创建队列队形(hello--队列的名字)/topic-----------session创建
Queue queue = session.createQueue("myQueue");
// 6 创建消费者-----------session创建
MessageConsumer consumer = session.createConsumer(queue); // 7 接收消息----text格式
TextMessage receive = (TextMessage) consumer.receive();
String text = receive.getText();
System.out.println("接收到的消息====" + text); // 8 提交事务
session.commit();
session.close();
connection.close(); }
4. 使用监听器的方式,从队列中消费消息
/**
*异步方式
Queue接受用Listener方式接受,多用
如果有多个监听listener,则交替执行
* @throws Exception
*/
@Test
public void receiverQueueListener() throws Exception{
// 1 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
// 2 使用工厂,创建连接
Connection connection = factory.createConnection();
// 3 启动连接
connection.start();
// 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务//死循环的不能用事物
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5 创建队列队形(hello--队列的名字)/topic-----------session创建
Queue queue = session.createQueue("myQueue");
// 6 创建消费者-----------session创建
MessageConsumer consumer = session.createConsumer(queue); //7 // 给消费者添加监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
TextMessage message = (TextMessage) msg;
try {
System.out.println("Listener1111111111接收到的消息是=="+message.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}); while(true){}
// 使用监听器的方式不能关闭,需要监听器一直工作
// session.commit();
// session.close();
// connection.close();
}

**ActiveMQ的消息订阅方式入门**(Pub/Sub模式)

/**
* Topic发送
* @throws JMSException
*/
@Test
public void sendTopicMessage() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建消息订阅
Topic topic = session.createTopic("myTopic");
// 创建生产者
MessageProducer producer = session.createProducer(topic);
// 创建消息,一组可以存储key value的消息
MapMessage message = session.createMapMessage();
message.setString("username", "cgx");
message.setString("password", "123456");
// 发送消息
producer.send(message);
// 提交事务
session.commit();
session.close();
connection.close();
}
/**
* Topic接受
*
* @throws JMSException
*/
@Test
public void testReceiverMessage() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建消息订阅
Topic topic = session.createTopic("myTopic");
// 创建消费者
MessageConsumer consumer = session.createConsumer(topic);
// 接收消息
MapMessage message = (MapMessage) consumer.receive();
System.out.println(message.getString("username"));
System.out.println(message.getString("password")); session.commit();
session.close();
connection.close();
}
/**
* Topic接受Listener监听方式
*
* @throws Exception
*/
@Test
public void receiverQueueListener() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息订阅
Topic topic = session.createTopic("myTopic");
// 创建消费者
MessageConsumer consumer = session.createConsumer(topic); // 给消费者添加监听器consumer添加监听
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
MapMessage message = (MapMessage) msg;
try {
System.out.println(message.getString("username"));
System.out.println(message.getString("password"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}); while (true) { } }

### Spring整合ActiveMQ ###★★★★★

----------
 
**Spring整合ActiveMQ**★★★★★
 
1. 创建applicationContext-mq.xml的配置文件,导入约束★★★★★
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/data/jpa
http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd"> </beans>
2. 具体的配置如下★★★★★
applicationContext-mq.xml===================mq的消息发送(消息生产者)
<!-- 配置连接工厂 -->
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> <!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session缓存数量和链接数有关 -->
<property name="sessionCacheSize" value="100" />
</bean> <!-- 定义JmsTemplate的Queue类型★★★★★ -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean> <!-- 定义JmsTemplate的Topic类型★★★★★ -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate" >
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>
 
3. 发送消息的代码如下★★★★★
  3.1.Queue方式:★★★★★
@Autowired
@Qualifier(value="jmsQueueTemplate")
private JmsTemplate queueTemplate;//Queue /**
* Queue发送消息---spring框架
*/
@Test
public void sendQueueMessage() {
// 发送消息 构造参数指定目标,因为配置文件中的队列和订阅模式是通过id与false和true进行区分
queueTemplate.send("myQueue", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
// 使用session创建消息,发送
TextMessage textMessage = session.createTextMessage("测试结合spring框架发送queue消息");
return textMessage;
}
});
}
  3.2.Topic方式:★★★★★
@Autowired
@Qualifier(value = "jmsTopicTemplate")
private JmsTemplate topicTemplate;//Topic /**
* Topic发送消息---spring框架
*/
@Test
public void sendTopicMessage() {
topicTemplate.send("spring_topic", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("username", "mdzz");
return mapMessage;
}
});
}
4. 接收消息的代码如下==========不提倡手动,要用监听器异步获取
/**
* Queue接收消息---spring框架
* 同步手动:不提倡
* receive("myQueue")要写目标,不写目标的话会报找不到目标的错误NO defaultDestination
*/
@Test
public void receiverMessage() {
//接收消息textMessage类型
TextMessage textMessage = (TextMessage) queueTemplate.receive("myQueue"); try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
 
**Spring配置监听器**★★★★★★★★★★★★★★★
 
1. 自定义监听器代码的编写----接收消息---spring框架---实现MessageListener接口★★★★★
  1.1.Queue:★★★★★
@Component(value="queueConsumer1")
public class QueueListener implements MessageListener { @Override
public void onMessage(Message arg0) {
// 把arg0强转
TextMessage textMessage = (TextMessage) arg0;
try {
// 输出消息
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
} }
 
  1.2.Topic:发送一个,两个都会接受★★★★★topic特点:有几个监听几个都会同时收到
@Component
public class TopicConsumer1 implements MessageListener { @Override
public void onMessage(Message arg0) {
MapMessage mapMessage = (MapMessage) arg0;
try {
System.out.println("TopicConsumer1===="+mapMessage.getString("username"));
} catch (JMSException e) {
e.printStackTrace();
}
} } @Component
public class TopicConsumer2 implements MessageListener {
//...
}
 
2. 编写配置文件
applicationContext-mq-consumer.xml=============mq的消息接受(负责监听接受消息)
<!-- 扫描包 -->
<context:component-scan base-package="com.my.jms.consumer" /> <!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> <!-- Spring Caching连接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean> <!-- Spring JmsTemplate 的消息生产者 start-->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory">
<jms:listener destination="myQueue" ref="queueConsumer1"/>
</jms:listener-container> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory">
<jms:listener destination="spring_topic" ref="topicConsumer1"/>
<jms:listener destination="spring_topic" ref="topicConsumer2" />
</jms:listener-container>
 
3.不用启动项目,把spring配置文件applicationContext-mq-consumer.xml启动起来,可以用采用下面方法
新建一个test类,让他一直启动着,这样就一直加载spring的配置文件
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext-mq-consumer.xml")
public class SpringQueueListenerTest { @Test
public void test(){
while(true);
}
}
4.只要发送端(发送消息---spring框架)一启动,监听器就会监听到,就会输出:测试结合spring框架发送queue消息★★★★★

spring整合总结:

消息发送
  1. 创建spring容器
  2. 从容器中获取JMSTemplate对象,发送消息
  3. 定义Destination
  4. 使用JMSTemplate对象发送消息
消息接受
  1. 创建一个类实现MessageListener 接口。业务处理在此类中实现。
  2.在spring容器中配置DefaultMessageListenerContainer对象,引用MessageListener 实现类对象接收消息。

项目整合ActiveMQ:

1. 消息生产者整合ActiveMQ
  消息生产者只需要发送消息
  需要把JMSTemplate和Destination交给spring进行管理

 部分代码:
/**===========================activeMQ消息发送========================================*/
// 发送消息!!!
this.send("save", item.getId());
} @Autowired
private JmsTemplate jmsTemplate; @Autowired
private Destination destination; /**
* 此方法就是用来发送消息的
* 考虑:1、发送什么数据?2、我需要什么数据?
* 在消息中需要:1、消息的标识:save,delete,update;2、商品的ID
*/
private void send(final String type, final Long itemId) {
// TODO Auto-generated method stub
jmsTemplate.send(destination, new MessageCreator() { @Override
public Message createMessage(Session session) throws JMSException {
//创建消息体
TextMessage textMessage = new ActiveMQTextMessage();
//设置消息内容
Map<String, Object> map = new HashMap<>();
map.put("type", type);
map.put("itemId", itemId);
try {
ObjectMapper mapper = new ObjectMapper();
textMessage.setText(mapper.writeValueAsString(map));
} catch (Exception e) {
e.printStackTrace();
}
return textMessage;
}
});
}

2. 消息消费改造
  在search-service添加
  ItemMessageListener:

/**===========================activeMQ消息发送========================================*/
@Autowired
private SearchService searchService; @Override
public void onMessage(Message message) {
//先判断此消息类型是否是TextMessage
if(message instanceof TextMessage){
//如果是,强转
TextMessage textMessage = (TextMessage)message;
try {
//获取消息:json
String json = textMessage.getText();
//杰克逊第三作用:直接解析json数据
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(json);
String type = jsonNode.get("type").asText();
Long itemId = jsonNode.get("itemId").asLong();
//根据解析出来的type,判断此type=save的时候我应该调用indexSearch方法
if("save".equals(type)){
searchService.indexItem(itemId);
} } catch (Exception e) {
e.printStackTrace();
}
} }

索引库增加商品会触发mq:

SearchServiceImpl:

@Override
public void indexItem(Long itemId) throws Exception {
Item item = this.itemMapper.selectByPrimaryKey(itemId); SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", item.getId());
doc.addField("item_title", item.getTitle());
doc.addField("item_image", item.getImage());
doc.addField("item_cid", item.getCid());
doc.addField("item_price", item.getPrice());
doc.addField("item_status", item.getStatus()); this.cloudSolrServer.add(doc); this.cloudSolrServer.commit(); }
 
上一篇:商城08——activeMQ 使用消息队列同步索引库


下一篇:climbing stairs(爬楼梯)(动态规划)