1.Activemq安装
直接到官网下载:记住apache的官网是域名反过来,比如我们找activemq就是activemq.apache.org。
最新版本要求最低的JDK是8,所以最好在电脑装多个版本的JDK,用的时候切换就 可以了。
下载完成是个压缩包,解压目录如下:
1.启动:%activemq%\apache-activemq-5.15.6\bin\win64\activemq.bat启动即可,日志如下:(可以看出是集成就wrapper)
访问:http://localhost:8161/进行测试:
点击上面圈住的地方进入管理界面:(activemq的默认账号和密码都是admin)
补充:
1.activemq内置jetty服务器,后台管理端口在conf\jetty.xml中进行配置;,默认是8161:
2.接收消息的端口在conf\activemq.xml),默认是61616--而且只有openwire有用,其他可以注释掉
2.我们肯定不可能一直开在窗口,所以需要注册为服务:
以管理员方式运行: %activemq%\apache-activemq-5.15.6\bin\win64\InstallService.bat
查看源码发现是以wrapper的方式注册为服务。
2.入门程序
我将上一篇的JMS编程图放在这里便于理解:
依赖的jar包:activemq-all-5.15.6.jar下载的zip包里面带的有
开发过程基本如下:
JMS的消息队列有两种模式,一般队列模型又称为P2P模式,PUB\SUB又称为主题模式
1.队列模型---生产消息的程序:
package cn.qlq.activemq; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 生产消息
*
* @author QiaoLiQiang
* @time 2018年9月18日下午11:04:41
*/
public class MsgProducer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String queueName = "myQueue"; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建Destination(Queue继承Queue)
Queue destination = session.createQueue(queueName);
// 6.创建生产者producer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 7.创建Message,有好多类型,这里用最简单的TextMessage
TextMessage tms = session.createTextMessage("textMessage:" + i);
// 8.生产者发送消息
producer.send(tms); System.out.println("send:" + tms.getText());
}
// 9.关闭connection
connection.close();
} }
结果:
.....
send:textMessage:93
send:textMessage:94
send:textMessage:95
send:textMessage:96
send:textMessage:97
send:textMessage:98
send:textMessage:99
到activemq的管理界面查看:
2.队列模型---消费者代码:
package cn.qlq.activemq; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消费消息
*
* @author QiaoLiQiang
* @time 2018年9月18日下午11:26:41
*/
public class MsgConsumer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String queueName = "myQueue"; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建Destination(Queue继承Queue)
Queue destination = session.createQueue(queueName);
// 6.创建消费者consumer
MessageConsumer consumer = session.createConsumer(destination);
// 7.给消费者绑定监听器(消息的监听是一个异步的过程,不可以关闭连接,绑定监听器线程是一直开启的,处于阻塞状态,所以可以在程序退出关闭)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 7.1由于消费者接受的是TextMessage,所以强转一下
TextMessage tms = (TextMessage) message;
try {
System.out.println("接收消息:" + tms.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} }
当然接收消息也可以使用consumer.receive();进行接收,如下:
// 7.receive是一个阻塞方法
Message message = consumer.receive();
while (message != null) {
TextMessage txtMsg = (TextMessage) message;
session.commit();
System.out.println("收到消 息:" + txtMsg.getText());
message = consumer.receive(1000L);
}
启动结果:(接受完线程并不关闭,所以是异步接收,线程处于阻塞状态。)
再开一个消费者:再运行一次消费者程序。
后台管理查看:(两个消费者)
在启动生产者生产一百条消息之后查看消费者:(切记切换控制台按钮查看消费信息):两个消费者平均消费信息,满足上一篇介绍的队列模型
第一个消费者:
第二个消费者:
补充:关于事务的介绍。
如果connection.createSession(true, Session.AUTO_ACKNOWLEDGE);的第一个参数为true,也就是开启事务,在事务性会话中,事务需要被提交,也就是提交之后才能最最终的数据产生影响。否则不会发送消息或者消费消息。也就是虽然send(msg),即使你不commit也不会生成消息。
如果传的参数为false,也就是不开启事务,消息何时被确认取决于创建会话的应答模式。一般如果是非事务环境,应答模式是 Session.AUTO_ACKNOWLEDGE ,会自动确认,也就是生产者调用send或者接受者调用recrive的时候就会自动对最终的结果产生影响。
如下是带有事务的操作代码:
package cn.qlq.activemq; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 生产消息
*
* @author QiaoLiQiang
* @time 2018年9月18日下午11:04:41
*/
public class MsgProducer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String queueName = "myQueue";
private static Session session = null; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 5.创建Destination(Queue继承Queue)
Queue destination = session.createQueue(queueName);
// 6.创建生产者producer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 10; i++) {
// 7.创建Message,有好多类型,这里用最简单的TextMessage
TextMessage tms = session.createTextMessage("textMessage:" + i); // 设置附加属性
tms.setStringProperty("str", "stringProperties" + i); // 8.生产者发送消息
producer.send(tms);
} // 9.提交事务
session.commit(); // 10.关闭connection
session.close();
connection.close();
} }
package cn.qlq.activemq; import java.util.Enumeration; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消费消息
*
* @author QiaoLiQiang
* @time 2018年9月18日下午11:26:41
*/
public class MsgConsumer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String queueName = "myQueue"; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
Enumeration jmsxPropertyNames = connection.getMetaData().getJMSXPropertyNames();
while (jmsxPropertyNames.hasMoreElements()) {
String nextElement = (String) jmsxPropertyNames.nextElement();
System.out.println("JMSX name ===" + nextElement);
}
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 5.创建Destination(Queue继承Queue)
Queue destination = session.createQueue(queueName);
// 6.创建消费者consumer
MessageConsumer consumer = session.createConsumer(destination); int i = 0;
while (i < 10) {
TextMessage textMessage = (TextMessage) consumer.receive();
System.out.println("接收消息:" + textMessage.getText() + ";属性" + textMessage.getStringProperty("str"));
i++; // 提交事务,进行确认收到消息
session.commit();
} session.close();
connection.close();
} }
3.主题模型---生产消息的程序:
与队列模式一样,只是Destination是Topic。
package cn.qlq.activemq.topic; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 主题模式的消息生产者
*
* @author QiaoLiQiang
* @time 2018年9月19日下午10:10:36
*/
public class MsgProducer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String topicName = "myTopic"; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
Destination destination = session.createTopic(topicName);
// 6.创建生产者producer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 7.创建Message,有好多类型,这里用最简单的TextMessage
TextMessage tms = session.createTextMessage("textMessage:" + i);
// 8.生产者发送消息
producer.send(tms); System.out.println("send:" + tms.getText());
}
// 9.关闭connection
connection.close();
} }
4.主题模式---消息消费者:
与队列模式一样,只是Destination是Topic。
package cn.qlq.activemq.topic; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 主题模式的消费消息
*
* @author QiaoLiQiang
* @time 2018年9月18日下午11:26:41
*/
public class MsgConsumer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String topicName = "myTopic"; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
Destination destination = session.createTopic(topicName);
// 6.创建消费者consumer
MessageConsumer consumer = session.createConsumer(destination);
// 7.给消费者绑定监听器(消息的监听是一个异步的过程,不可以关闭连接,绑定监听器线程是一直开启的,处于阻塞状态,所以可以在程序退出关闭)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 7.1由于消费者接受的是TextMessage,所以强转一下
TextMessage tms = (TextMessage) message;
try {
System.out.println("接收消息:" + tms.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} }
补充:上面的步骤7接收消息可以用receive()进行接收,该方法也是阻塞方法。
// 7.receive是一个阻塞方法
Message message = consumer.receive();
while (message != null) {
TextMessage txtMsg = (TextMessage) message;
session.commit();
System.out.println("收到消 息:" + txtMsg.getText());
message = consumer.receive(1000L);
}
测试:
先启动生产者生成100条消息:
启动两个消费者,不会消费消息,满足消费者不能消费订阅之前就发送到主题中的消息:
再启动生产者发布100条消息,查看后台管理界面和消费者控制台信息:
两个消费者控制台一样:(满足主题中的消息被所有订阅者消费)
3.JMS的消息结构
JMS由下面三部分组成:消息头、属性、消息体
1.消息头 : 包含消息的识别信息和路由信息,
消息头包含一些标准的属性如下:
下面是进一步的解释:
1.JMSDestination:消息发送的目的地,主要是指Queue和Topic,自动分配,由下面程序识别:
// 5.创建Destination(Queue继承Queue)
Queue destination = session.createQueue(queueName);
2.JMSDeliveryMode:传送模式。两种:持久模式和非持久模式。一条持久性消息应该被传送"一次仅仅一次",这就意味着如果JMS系统出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。一条非持久的消息最多会传送一次,这意味着服务器出现故障,该消息将丢失。自动分配
3.JMSExpiration:消息过期时间,等于send方法中的timeToLive的值加上发送时刻的GMT时间值。如果值为0表示永不过期。如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息会被清除。自动分配
4.JMSPriority:优先级,0-9十个级别。默认是4。0-4是普通消息,5-9是加急消息。自动分配
5.JMSMessageID:每个消息的唯一标识。由JMS Provider产生。自动生成
6.JMSTimestamp:JMS Provider在调用send()方法时自动设置的。它是消息被发送和消费者实际接收的时间差。自动分配
7.JMSCorrelationID:用来连接到另一个消息,典型的应用是在回复消息中连接到原消息。JMSCorrelationID一般用于将一条消息标记为对JMSMessageID标识的上一条消息的应答,不过它的值可以是任何值。由开发者设置
8.JMSReplyTo:本消息回复消息的目的地,开发者设置
9.JMSType:消息类型的识别符。开发者设置
10.JMSRedelivered:(重新投递)如果一个客户端收到了一个消息的此参数设置为true,则表示可能客户端在早些时候收到过消息,但是并没有签收(Ack)。如果该消息被重新传送,JMSRedelivered=true。反之,JMSRedelivered=false。自动生成
2.消息体:
JMS定义的五种类型的消息体格式,也叫消息类型,包括:TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五种
参考:https://www.cnblogs.com/qlqwjy/p/10463660.html
3.消息属性
包含下面三种类型的消息属性:
应用程序设置和添加的属性,如下:
// 设置附加属性
tms.setStringProperty("str", "stringProperties" + i);
- JMS定义的属性
使用"JMSX"作为属性名的前缀。如下方法可以返回所有连接支持的JMSX属性的名字。
connection.getMetaData().getJMSXPropertyNames();
- JMS供应商特定的属性
如下:生产者代码:
package cn.qlq.activemq; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 生产消息
*
* @author QiaoLiQiang
* @time 2018年9月18日下午11:04:41
*/
public class MsgProducer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String queueName = "myQueue";
private static Session session = null; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建Destination(Queue继承Queue)
Queue destination = session.createQueue(queueName);
// 6.创建生产者producer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 1; i++) {
// 7.创建Message,有好多类型,这里用最简单的TextMessage
TextMessage tms = session.createTextMessage("textMessage:" + i);
tms.setJMSDeliveryMode(1);
tms.setJMSExpiration(10 * 1000);
tms.setJMSPriority(5);
tms.setJMSMessageID(i + "MessageId");
tms.setJMSTimestamp(5 * 1000);
tms.setJMSCorrelationID("JMSCorrelationID" + i);
tms.setJMSReplyTo(destination);
tms.setJMSType("JMSType" + i); // 设置附加属性
tms.setStringProperty("str", "stringProperties" + i); // 8.生产者发送消息
producer.send(tms);
}
// 9.关闭connection
connection.close();
} }
查看发送的消息:
消费者:
package cn.qlq.activemq; import java.util.Enumeration; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消费消息
*
* @author QiaoLiQiang
* @time 2018年9月18日下午11:26:41
*/
public class MsgConsumer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String queueName = "myQueue"; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
Enumeration jmsxPropertyNames = connection.getMetaData().getJMSXPropertyNames();
while (jmsxPropertyNames.hasMoreElements()) {
String nextElement = (String) jmsxPropertyNames.nextElement();
System.out.println("JMSX name ===" + nextElement);
}
// 3.启动connection
connection.start();
// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建Destination(Queue继承Queue)
Queue destination = session.createQueue(queueName);
// 6.创建消费者consumer
MessageConsumer consumer = session.createConsumer(destination); int i = 0;
while (i < 1) {
i++;
TextMessage textMessage = (TextMessage) consumer.receive();
System.out.println("接收消息:" + textMessage.getText() + ";属性" + textMessage.getStringProperty("str"));
}
} }
结果:
JMSX name ===JMSXUserID
JMSX name ===JMSXGroupID
JMSX name ===JMSXGroupSeq
JMSX name ===JMSXDeliveryCount
JMSX name ===JMSXProducerTXID
接收消息:textMessage:0;属性stringProperties0