ActiveMQ 在java中的使用,通过单例模式、工厂实现
Jms规范里的两种message传输方式Topic和Queue,两者的对比如下表():
Topic | Queue | |
概要 | Publish Subscribe messaging 发布订阅消息 | Point-to-Point 点对点 |
有无状态 | topic数据默认不落地,是无状态的。 |
Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。 |
完整性保障 | 并不保证publisher发布的每条数据,Subscriber都能接受到。 | Queue保证每条数据都能被receiver接收。 |
消息是否会丢失 | 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 | Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。 |
消息发布接收策略 | 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 | 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。 |
一、导jar包
activemq的依赖包
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>1.6.1</version>
</dependency> <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.1.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
</exclusions>
</dependency>
二、java代码
创建一下四个java文件,成为mq的公共数据连接池
1、连接工厂 配置
package com.broadsense.iov.base.jms; import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
/**
* 连接工厂 配置
*
* @author flm
* 2017年10月13日
*/
public class ConnectionFactory
{
private static final String URL = "tcp://10.10.1.1:61616";
private static final String USERNAME = "hkadmin";
private static final String PASSWORD = "hk667";
private static final int SESSIONCACHESIZE = 20;
private javax.jms.ConnectionFactory factory; public static synchronized javax.jms.ConnectionFactory getInstance()
{
if (SingletonHolder.INSTANCE.factory == null) {
SingletonHolder.INSTANCE.build();
}
return SingletonHolder.INSTANCE.factory;
} private void build()
{
AMQConfigBean bean = loadConfigure();
this.factory = buildConnectionFactory(bean);
} private javax.jms.ConnectionFactory buildConnectionFactory(AMQConfigBean bean) {
javax.jms.ConnectionFactory targetFactory = new ActiveMQConnectionFactory(bean.getUserName(), bean.getPassword(), bean.getBrokerURL()); CachingConnectionFactory connectoryFacotry = new CachingConnectionFactory();
connectoryFacotry.setTargetConnectionFactory(targetFactory);
connectoryFacotry.setSessionCacheSize(bean.getSessionCacheSize()); return connectoryFacotry;
} private AMQConfigBean loadConfigure() {
if ("tcp://10.10.1.1:61616" != null) {
try {
return new AMQConfigBean("tcp://10.10.1.1:61616", "hkadmin", "hk667", 20);
} catch (Exception e) {
throw new IllegalStateException("load amq config error!");
}
}
throw new IllegalStateException("load amq config error!");
} private static class AMQConfigBean
{
private String brokerURL;
private String userName;
private String password;
private int sessionCacheSize; public AMQConfigBean() {
} public AMQConfigBean(String brokerURL, String userName, String password, int sessionCacheSize) {
this.brokerURL = brokerURL;
this.userName = userName;
this.password = password;
this.sessionCacheSize = sessionCacheSize;
} public String getBrokerURL() {
return this.brokerURL;
} public void setBrokerURL(String brokerURL) {
this.brokerURL = brokerURL;
} public String getUserName() {
return this.userName;
} public void setUserName(String userName) {
this.userName = userName;
} public String getPassword() {
return this.password;
} public void setPassword(String password) {
this.password = password;
} public int getSessionCacheSize() {
return this.sessionCacheSize;
} public void setSessionCacheSize(int sessionCacheSize) {
this.sessionCacheSize = sessionCacheSize;
}
} private static class SingletonHolder
{
static ConnectionFactory INSTANCE = new ConnectionFactory(null);
}
}
2、模版
package com.broadsense.iov.base.jms; import org.springframework.jms.core.JmsTemplate;
/**
* 模板厂
*
* @author flm
* 2017年10月13日
*/
public class JmsTemplateFactory
{
private final javax.jms.ConnectionFactory factory;
private JmsTemplate topicJmsTemplate;
private JmsTemplate queueJmsTemplate; public static JmsTemplateFactory getInstance()
{
return SingletonHolder.INSTANCE;
} private JmsTemplateFactory()
{
this.factory = ConnectionFactory.getInstance();
} public synchronized JmsTemplate getTopicJmsTemplate() {
if (this.topicJmsTemplate == null) {
this.topicJmsTemplate = createTemplate(this.factory, true);
}
return this.topicJmsTemplate;
} public synchronized JmsTemplate getQueueJmsTemplate() {
if (this.queueJmsTemplate == null) {
this.queueJmsTemplate = createTemplate(this.factory, false);
}
return this.queueJmsTemplate;
} private JmsTemplate createTemplate(javax.jms.ConnectionFactory factory, boolean pubSubDomain) {
JmsTemplate template = new JmsTemplate(factory);
template.setPubSubDomain(pubSubDomain);
return template;
} public static class SingletonHolder
{
static JmsTemplateFactory INSTANCE = new JmsTemplateFactory(null);
}
}
3、消费者 模版
package com.broadsense.iov.base.jms; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SimpleMessageListenerContainer;
/**
* JMS监听器 创建消费者
*
* @author flm
* 2017年10月13日
*/
public class JMSListener
{
private static final Logger LOGGER = LoggerFactory.getLogger(JMSListener.class);
private static final Map<String, Destination> MQDESTS = new ConcurrentHashMap();
/**
* 开启一个 点对点的 消息队列监听 的消费者
*
* @param queueName 队列名称
* @param subName 订阅者的名字
* @param listener 监听
*/
public static synchronized void startJmsQueueListener(String queueName, MessageListener listener)
{
startJmsQueueListener(queueName, null, listener);
}
public static synchronized void startJmsQueueListener(String queueName, String subName, MessageListener listener) {
Destination dst = (Destination)MQDESTS.get("QUEUE_" + queueName);
if (dst == null) {
ActiveMQQueue mq = new ActiveMQQueue(queueName);
startJmsListener(mq, subName, listener);
MQDESTS.put("QUEUE_" + queueName, mq);
} else {
LOGGER.warn(queueName + " already started");
}
}
/**
* 开启 一对多 主题的 消息监听的消费者
*
* @param topicName 主题消息名称
* @param subName 订阅者的名字
* @param listener 监听
*/
public static synchronized void startJmsTopicListener(String topicName, MessageListener listener)
{
startJmsTopicListener(topicName, null, listener);
} public static synchronized void startJmsTopicListener(String topicName, String subName, MessageListener listener) {
ActiveMQTopic mq = new ActiveMQTopic(topicName);
startJmsListener(mq, subName, listener);
MQDESTS.put("QUEUE_" + topicName, mq);
}
/**
* 开始 消息监听器 消费者
*
* @param dest 目的地
* @param subName 持久订阅的名字
* @param msgListener 消息监听器
*/
private static void startJmsListener(Destination dest, String subName, MessageListener msgListener)
{
javax.jms.ConnectionFactory factory = ConnectionFactory.getInstance(); SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();
listener.setConnectionFactory(factory);
listener.setDestination(dest);
listener.setMessageListener(msgListener);
if ((subName != null) && (subName != "")) {
listener.setDurableSubscriptionName(subName);
}
listener.start();
}
}
4、生产者 模版
package com.broadsense.iov.base.jms; import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator; /**
* 创建 jms生产者
*
* @author flm
* 2017年10月13日
*/
public class JMSPublisher
{
/**
* 发送消息
* Topic 生产者
*
* @param dest 目的地
* @param msg 消息内容
*/
public static void sendTopicMessage(String dest, String msg)
{
JmsTemplateFactory.getInstance().getTopicJmsTemplate().send(dest, new MessageCreator(msg)
{
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(this.val$msg);
}
});
}
/**
* 发送消息
* Queue 生产者
*
* @param dest 目的地
* @param msg 消息内容
*/
public static void sendQueueMessage(String dest, String msg)
{
JmsTemplateFactory.getInstance().getQueueJmsTemplate().send(dest, new MessageCreator(msg)
{
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(this.val$msg);
}
});
}
}
三、activemq的使用
1、创建一个junit测试,@Test 发布、接受、即可看到消息,mq管理后台也可以看到
package com.broadsense.iov.base.jms; import com.broadsense.iov.base.jms.JMSListener;
import com.broadsense.iov.base.jms.JMSPublisher;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.junit.Test; /**
*
* @author flm
*/
public class JMSPublisherTest { public JMSPublisherTest() {
} /**
* 生产者 发布消息
* @throws
*/
@Test
public void testSendMessage() throws InterruptedException {
for (int idx = 1; idx < 3; idx++) { /*
* 生产者 发布 消息到 queue/queue_b 的队列中
*/
JMSPublisher.sendQueueMessage("queue/queue_b", String.valueOf(idx * 1111)); /*
* 生产者 发布消息 到 topic/send 的Topic 主题中
*/
//JMSPublisher.sendTopicMessage("topic/send", String.valueOf(idx * 1111));
}
} /**
* 消费者 订阅接受消息
*/
@Test
public void receiver() {
/*
* 消费者 订阅主题 topic/send 是否有消息发布,有侧打印出来 (通过 onMessage 监听)
*/
/*JMSListener.startJmsTopicListener("topic/send", new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
System.out.println("== 收到一个JMS消息..." + msg.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});*/ /*
* 消费者 订阅队列 queue/queue_b 是否有消息发布,有侧打印出来 (通过 onMessage 监听)
*/
JMSListener.startJmsQueueListener("queue/queue_b" ,new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
System.out.println("== 收到一个JMS消息..." + msg.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}); try {
System.in.read();
} catch (IOException ex) {
Logger.getLogger(JMSPublisherTest.class.getName()).log(Level.SEVERE, null, ex);
}
} }
2、真正的项目实现
在项目的中具体实现,是加载一个类来实现订阅消息
加载启动一个订阅的主题,给一个类MQ()处理
package com.ifengSearch.track.dao; import org.springframework.stereotype.Repository; import com.broadsense.iov.base.jms.JMSListener; /**
* 项目启动即 开启
* 通过 spring 依赖加载 Lister 订阅topic/send
* @author flm
* @2017年10月16日
*/
@Repository
public class Lister {
public Lister(){
try {
JMSListener.startJmsTopicListener("topic/send",new QM());// QM() 订阅 主题 topic/send
} catch (Exception e) {
}
}
}
MQ()订阅消息的处理类,通过实现
package com.ifengSearch.track.dao; import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage; /**
* 通过 实现 MessageListener 的 onMessage 来监听消息
* 接受、处理消息
* @author flm
* @2017年10月16日
*/
public class MQ implements MessageListener { @Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
System.out.println("== 收到一个JMS消息..." + msg.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}