1.异步消息Jms及其JmsTemplate的源代码分析,消息代理ActiveMQ

一. 介绍

  借助Spring,有多种异步消息的可选方案,本章使用Jms。Jms的消息模型有两种,点对点消息模型(队列实现)和发布-订阅消息模型(主题)。

1.异步消息Jms及其JmsTemplate的源代码分析,消息代理ActiveMQ

图1.点对点消息模型(一对一)

1.异步消息Jms及其JmsTemplate的源代码分析,消息代理ActiveMQ

图2.发布-订阅消息模型(一对多)

二. 仅适用Jsm

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue; public class Testjsm {
public static void main(String[] args) throws JMSException {
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 连接
Connection conn = cf.createConnection();
// 创建回话
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,队列的名称
Destination destination = new ActiveMQQueue("SimperConteoller.topic");
// 创建发送者
MessageProducer producer = session.createProducer(destination);
// 创建发送内容
TextMessage message = session.createTextMessage();
message.setText("hello,大家好");
// 发送
producer.send(message);
} }

  步骤为:

     1.创建工厂 2.创建连接 3.创建会话 4.创建目的地 5.创建发送者 6.创建发送内容 7.发送者进行发送内容的发送

  代码步骤多,且异常不好处理。

二. 使用模板

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage; import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator; public class Test {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-jsm.xml");
JmsTemplate tem = (JmsTemplate) context.getBean("jmsTemplate");
Destination des = (Destination) context.getBean("topic");
tem.send(des, new MessageCreator() { public Message createMessage(Session arg0) throws JMSException { return arg0.createTextMessage("大家好!");
}
}); TextMessage ms = (TextMessage) tem.receive(des);
System.out.println("接受者是:" + ms); }
}

  步骤:只需要发送。使用JmsTemplate,可以创建连接,获得会话以及发送和接受消息,使代码专注于构建要发送的消息和处理接受到的消息。

  1.pom.xml
    <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>5.12.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.6.RELEASE</version>
<scope>compile</scope>
</dependency>
1.异步消息Jms及其JmsTemplate的源代码分析,消息代理ActiveMQ
  2. 链接,会话在xml管理
      1.连接activemq
      2.目的地
      3.格式转化器
      4.jsm模板发送
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:c="http://www.springframework.org/schema/c"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.2.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616">
</bean> <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic" c:_="SimperConteoller.topic"></bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory">
</bean> </beans>
1.异步消息Jms及其JmsTemplate的源代码分析,消息代理ActiveMQ
三.JmsTemplate的源码分析
 tem.send(des, new MessageCreator() {

            public Message createMessage(Session arg0) throws JMSException {

                return arg0.createTextMessage("大家好!");
}
});

源代码:

    @Override
public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {
execute(new SessionCallback<Object>() {
@Override
public Object doInJms(Session session) throws JMSException {
doSend(session, destination, messageCreator);
return null;
}
}, false);
} 

  send函数:1. execute:创建连接与会话

2.doSend:创建发送者和发送内容,并发送

execute源代码:创建连接与会话

    public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
Assert.notNull(action, "Callback object must not be null");
Connection conToClose = null;
Session sessionToClose = null;
try {
Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
getConnectionFactory(), this.transactionalResourceFactory, startConnection);
if (sessionToUse == null) {
conToClose = createConnection();
sessionToClose = createSession(conToClose);
if (startConnection) {
conToClose.start();
}
sessionToUse = sessionToClose;
}
if (logger.isDebugEnabled()) {
logger.debug("Executing callback on JMS Session: " + sessionToUse);
}
return action.doInJms(sessionToUse);
}
catch (JMSException ex) {
throw convertJmsAccessException(ex);
}
finally {
JmsUtils.closeSession(sessionToClose);
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
}
}

doSend源代码:创建发送者和发送内容,并发送

protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException { Assert.notNull(messageCreator, "MessageCreator must not be null");
MessageProducer producer = createProducer(session, destination);
try {
Message message = messageCreator.createMessage(session);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + message);
}
doSend(producer, message);
// Check commit - avoid commit call within a JTA transaction.
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
JmsUtils.commitIfNecessary(session);
}
}
finally {
JmsUtils.closeMessageProducer(producer);
}
}
    protected void doSend(MessageProducer producer, Message message) throws JMSException {
if (this.deliveryDelay >= 0) {
if (setDeliveryDelayMethod == null) {
throw new IllegalStateException("setDeliveryDelay requires JMS 2.0");
}
ReflectionUtils.invokeMethod(setDeliveryDelayMethod, producer, this.deliveryDelay);
}
if (isExplicitQosEnabled()) {
producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
}
else {
producer.send(message);
}
}

  

上一篇:Hibernate 关联关系(一对多)


下一篇:Ace of Aces