Message高级特性 & 内嵌Jetty实现文件服务器

1. Messaage Properties  常见属性

  更多的属性以及介绍参考:http://activemq.apache.org/activemq-message-properties.html

  消息属性,这个在之前刚学习ActiveMQ的时候已经介绍过,常见的如下:

  1. queue消息默认是持久化
  2. 消息得优先级默认是4.
  3. 消息发送时设置了时间戳。
  4. 消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略。
  5. 如果消息是重发的,将会被标记出来。
  6. JMSReplyTo标识响应消息发送到哪个queue.
  7. JMSCorelationID标识此消息相关联的消息id,可以用这个标识把多个消息连接起来。
  8. JMS同时也记录了消息重发的次数。默认是6次
  9. 如果有一组相关联的消息需要处理,可以分组;只需要设置消息组的名字和这个消息的第几个消息。
  10. 如果消息中一个事务环境,则TXID将会被设置。
  11. 此外ActiveMQ在服务器端额外设置了消息入队和出队的时间戳。
  12. ActiveMQ里消息属性的值,不仅可以用基本类型,还可以用List或Map类型

2. Advisory Message  监听ActiveMQ自己的消息

  Advisory Message是ActiveMQ自身的系统消息地址,可以监听该地址来获取activemq的系统消息。目前支持获取如下信息:

consumers, producers和connections的启动和停止
创建和销毁temporary destinations
opics 和queues 的消息过期
brokers发送消息给destination,但是没有consumers
connections启动和停止

说明:

1. 所有advisory的topic,前缀是:ActiveMQ.Advisory
2. 所有Advisory的消息类型是:‘Advisory’,所有的Advisory都有的消息属性有:originBrokerId,originBrokerName,originBrokerURL
3. 具体支持的topic和queue,请参照:
   http://activemq.apache.org/advisory-message.html
Advisory功能默认是关闭的,打开Advisorie的具体实现如下:

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="1000">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" advisoryForConsumed="true" />
</policyEntries>
</policyMap>
</destinationPolicy>
...
</broker>

配置启动之后我们向主题chatTopic发送一条消息可以查看到如下activemq增加的主题:

Message高级特性 & 内嵌Jetty实现文件服务器

我们订阅上面主题:

package cn.qlq.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage; /**
* 主题模式的消费消息
*
* @author QiaoLiQiang
* @time 2018年9月18日下午11:26:41
*/
public class MsgConsumer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String topicName = "ActiveMQ.Advisory.Producer.Topic.chatTopic"; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
// 设置链接的ID
// 3.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 4.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
Topic destination = session.createTopic(topicName);
// 创建TopicSubscriber来订阅;需要在连接上设置消费者id,用来识别消费者;设置好了过后再start 这个 connection
// 5.启动connection
connection.start();
// 6.创建消费者consumer
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
while (message != null) {
ActiveMQMessage txtMsg = (ActiveMQMessage) message;
session.commit();
System.out.println("收到消 息:" + txtMsg.getMessage());
message = consumer.receive(1000L);
}
session.close();
connection.close();
} }

第二种获取某一队列的ActiveMQ自身的生产者主题和消费者主题的方法如下:

     String topicName = "chatTopic";
Topic topic = session.createTopic(topicName);
Destination destination = AdvisorySupport.getProducerAdvisoryTopic(topic);
Destination destination2 = AdvisorySupport.getConsumerAdvisoryTopic(topic);

再次发送消息发现该消费者接收到的消息如下:

收到消 息:ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0:16, originalDestination = null, originalTransactionId = null, producerId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0, destination = topic://ActiveMQ.Advisory.Producer.Topic.chatTopic, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1554966377407, brokerOutTime = 1554966377410, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1324409e, dataStructure = ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:MicroWin10-1535-59019-1554966377239-1:1:1:1, destination = topic://chatTopic, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}, redeliveryCounter = 0, size = 0, properties = {producerCount=1, originBrokerName=brokerName, originBrokerURL=tcp://MicroWin10-1535:61616, originBrokerId=ID:MicroWin10-1535-57537-1554965648551-0:1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
收到消 息:ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0:17, originalDestination = null, originalTransactionId = null, producerId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0, destination = topic://ActiveMQ.Advisory.Producer.Topic.chatTopic, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1554966377439, brokerOutTime = 1554966377439, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@246ae04d, dataStructure = RemoveInfo {commandId = 0, responseRequired = true, objectId = ID:MicroWin10-1535-59019-1554966377239-1:1:1:1, lastDeliveredSequenceId = -2}, redeliveryCounter = 0, size = 0, properties = {producerCount=0, originBrokerName=brokerName, originBrokerURL=tcp://MicroWin10-1535:61616, originBrokerId=ID:MicroWin10-1535-57537-1554965648551-0:1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}

总结Advisory的使用方式:
  1. 要在配置文件里面开启Advisories.
  2. 消息发送端没什么变化,不做多余改变或配置,
  3. 消息接收端:
    1)根据你要接收的消息类型,来设置不同的topic,可以直接从界面查询主题的name之后订阅,也可以借助AdvisorySupport类来获取
    2)由于这个topic默认不是持久化的,所有要先看起接收端,然后再发送消息。

    3) 接收消息的时候,接收到的消息类型是ActiveMQMessage,所以需要先转换成ActiveMQMessage再获取消息(也就是这是ActiveMQ特有的消息类型)

3.延迟和定时消息传递(Delay and schedule Message Delivery)

  ActimeMQ也可以实现延时或者定时投递消息,类似于quartz定时任务等。

一共四个属性
AMQ_SCHEDULED_DELAY: 延迟投递的时间
AMQ_SCHEDULED_PERIOD: 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT:重复投递次数
AMQ_SCHEDULED_CRON: Cron表达式

ActiveMQ也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,可以使用这个类来辅助设置。

(1)首先在broker上设置schedulerSupport="true"

(2)程序上设置延迟以及定时效果,如下设置延迟30秒,重发3次,重发间隔是5秒的效果

            TextMessage tms = session.createTextMessage("textMessage:" + i);
long time = 30 * 1000;
tms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
tms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 3);
tms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5 * 1000);
// 9.生产者发送消息
producer.send(tms);

消费者结果:

  消费者不会马上接收到消息,而是在30秒后第一次接受消息,并且间隔5秒后再次接受消息,总共会接收4次。

(3)使用CRON表达式

            // 8.创建Message,有好多类型,这里用最简单的TextMessage
TextMessage tms = session.createTextMessage("textMessage:" + i);
tms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"* * * * *");
// 9.生产者发送消息
producer.send(tms);

  CRON表达式的优先级高于另外三个参数,如果在设置了延时时间,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔period,就是说设置的是叠加效果,例如每小时都会发生消息被投递10次,延迟0秒开始,每次间隔1秒。

4.Blob Message(传输大文件,一般大于100m)

  也就是发送文件消息。ActiveMQ传输文件的方式有byteMessage、StreamMessage、BlobMessage。其中bytemessage和streammessage如果不加工处理的话,只能传输小文件,小于100M的文件应该都可以传,blobmessage可以传输较大的文件。对于比较小的文件,简单的处理方式是先读取所有的文件成byte[],然后使用ByteMessage,把文件数据发送到broker,像正常的message一样处理。对于大文件,例如1GB以上的文件,这么搞直接把client或是broker给oom掉了。有些时候,我们需要传递Blob(Binary Large Objects)消息,在5.14之前,(5.12和5.13需要在jetty.xml中手动开启)可以按照如下的方式配置使用fileserver:

  配置BLOB Tansfer Policy,可以在发送方的连接URI上设置,如:

tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=http://localhost:8161/fileserver

  在5.14之后,就只能通过使用ftp协议来发送blobmessage,或自己将文件传到某个服务器上(通过FTP或其他方式),而后将该文件的url放在BlobMessage中再发送这条BlobMessage。不过,5.15好像又提供了http方式,不过需要自己实现文件上传服务器。

  由于我使用的版本你是5.15的,所以我需要先搭建服务器。

1.使用内嵌netty搭建文件服务器 (重要)

  也就是对文件进行管理,下面我的例子是使用jetty对G:/files/文件夹进行管理,会在浏览器检测此文件夹下面的文件,浏览器能解析的可以通过浏览器访问,不能解析(例如doc文件)的查看会下载。

  文件处理的类参考的下面三个类:   http://svn.apache.org/repos/asf/activemq/trunk/activemq-fileserver/  中三个类的的实现方式。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.qlq</groupId>
<artifactId>FileServer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging> <dependencies>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jsp-api</artifactId>
<version>2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all-server</artifactId>
<version>7.6.4.v20120524</version>
</dependency>
<!-- slf4j 依赖包 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.0-rc1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.0-rc1</version>
</dependency>
<!-- 文件上传的测试包httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.3.1</version>
</dependency>
</dependencies> <build>
<!-- 配置了很多插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build> </project>

服务器启动类:

import org.eclipse.jetty.server.DispatcherType;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; public class StartServer {
public static void main(String[] args) throws Exception {
Server server = new Server(8080); ServletContextHandler handler = new ServletContextHandler();
// 相当于设置项目名称
handler.setContextPath("/fileserver");
// 设置资源文件所在目录,工具类中会以这个目录作为文件服务目录存储文件
handler.setResourceBase("G:/files/");
// handler.setResourceBase(".");
System.out.println(handler.getServletContext().getRealPath("/")); handler.addFilter(FilenameGuardFilter.class, "/*", DispatcherType.FORWARD.ordinal()); handler.addFilter(RestFilter.class, "/*", DispatcherType.FORWARD.ordinal());
ServletHolder defaultServlet = new ServletHolder();
defaultServlet.setName("DefaultServlet");
defaultServlet.setClassName("org.eclipse.jetty.servlet.DefaultServlet"); handler.addServlet(defaultServlet, "/*"); server.setHandler(handler);
server.start();
}
}

重要的文件处理在下面的过滤器中:(PUT类型上传文件,请求method必须是PUT,GET请求资源,DELETE是删除文件,且请求类似于Restful风格,path最后一部分是文件的名称)

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Enumeration; import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.UnavailableException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public class RestFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(RestFilter.class); private static final String HTTP_HEADER_DESTINATION = "Destination";
private static final String HTTP_METHOD_MOVE = "MOVE";
private static final String HTTP_METHOD_PUT = "PUT";
private static final String HTTP_METHOD_GET = "GET";
private static final String HTTP_METHOD_DELETE = "DELETE"; private String readPermissionRole;
private String writePermissionRole;
private FilterConfig filterConfig; public void init(FilterConfig filterConfig) throws UnavailableException {
this.filterConfig = filterConfig;
readPermissionRole = filterConfig.getInitParameter("read-permission-role");
writePermissionRole = filterConfig.getInitParameter("write-permission-role");
} private File locateFile(HttpServletRequest request) {
return new File(filterConfig.getServletContext().getRealPath(request.getServletPath()), request.getPathInfo());
} public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
if (!(request instanceof HttpServletRequest && response instanceof HttpServletResponse)) {
if (LOG.isDebugEnabled()) {
LOG.debug("request not HTTP, can not understand: " + request.toString());
}
chain.doFilter(request, response);
return;
} HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response; System.out.println(httpRequest.getRequestURL());
System.out.println(httpRequest.getMethod()); if (httpRequest.getMethod().equals(HTTP_METHOD_MOVE)) {
doMove(httpRequest, httpResponse);
} else if (httpRequest.getMethod().equals(HTTP_METHOD_PUT)) {
doPut(httpRequest, httpResponse);
} else if (httpRequest.getMethod().equals(HTTP_METHOD_GET)) {
if (checkGet(httpRequest, httpResponse)) {
chain.doFilter(httpRequest, httpResponse); // actual processing
// done elsewhere
}
} else if (httpRequest.getMethod().equals(HTTP_METHOD_DELETE)) {
doDelete(httpRequest, httpResponse);
} else {
chain.doFilter(httpRequest, httpResponse);
}
} protected void doMove(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("RESTful file access: MOVE request for " + request.getRequestURI());
} if (writePermissionRole != null && !request.isUserInRole(writePermissionRole)) {
response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
return;
} File file = locateFile(request);
String destination = request.getHeader(HTTP_HEADER_DESTINATION); if (destination == null) {
response.sendError(HttpURLConnection.HTTP_BAD_REQUEST, "Destination header not found");
return;
} try {
URL destinationUrl = new URL(destination);
IOHelper.copyFile(file, new File(destinationUrl.getFile()));
IOHelper.deleteFile(file);
} catch (IOException e) {
response.sendError(HttpURLConnection.HTTP_INTERNAL_ERROR); // file
// could
// not
// be
// moved
return;
} response.setStatus(HttpURLConnection.HTTP_NO_CONTENT); // we return no
// content
} protected boolean checkGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("RESTful file access: GET request for " + request.getRequestURI());
} if (readPermissionRole != null && !request.isUserInRole(readPermissionRole)) {
response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
return false;
} else {
return true;
}
} protected void doPut(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("RESTful file access: PUT request for " + request.getRequestURI());
} if (writePermissionRole != null && !request.isUserInRole(writePermissionRole)) {
response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
return;
} File file = locateFile(request); if (file.exists()) {
boolean success = file.delete(); // replace file if it exists
if (!success) {
response.sendError(HttpURLConnection.HTTP_INTERNAL_ERROR); // file
// existed
// and
// could
// not
// be
// deleted
return;
}
} FileOutputStream out = new FileOutputStream(file);
try {
IOHelper.copyInputStream(request.getInputStream(), out);
} catch (IOException e) {
LOG.warn("Exception occured", e);
throw e;
} finally {
out.close();
} response.setStatus(HttpURLConnection.HTTP_NO_CONTENT); // we return no
// content
} protected void doDelete(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("RESTful file access: DELETE request for " + request.getRequestURI());
} if (writePermissionRole != null && !request.isUserInRole(writePermissionRole)) {
response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
return;
} File file = locateFile(request); if (!file.exists()) {
response.sendError(HttpURLConnection.HTTP_NOT_FOUND); // file not
// found
return;
} boolean success = IOHelper.deleteFile(file); // actual delete operation if (success) {
response.setStatus(HttpURLConnection.HTTP_NO_CONTENT); // we return
// no
// content
} else {
response.sendError(HttpURLConnection.HTTP_INTERNAL_ERROR); // could
// not
// be
// deleted
// due
// to
// internal
// error
}
} public void destroy() {
// nothing to destroy
}
}

过滤器中处理servlet文件上传,参考:https://www.cnblogs.com/qlqwjy/p/8722267.html

Httpclient上传文件代码如下:(PUT请求上传文件)

package upload;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader; import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.content.FileBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils; /**
* httpclient上传文件(测试没问题)
*
* @author Administrator
*
*/
public class HttpClientUploadFile {
public static void main(String[] args) throws ClassNotFoundException, ClientProtocolException, IOException {
CloseableHttpClient httpclient = HttpClientBuilder.create().build();
CloseableHttpResponse response = null;
try {
HttpPut httpput = new HttpPut(
"http://localhost:8080/fileserver/ID:MicroWin10-1535-54829-1554981858740-1:1:1:1:1");
// 可以选择文件,也可以选择附加的参数
HttpEntity req = MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
.addPart("file", new FileBody(new File("G:/Exam.log")))// 上传文件,如果不需要上传文件注掉此行
.build();
httpput.setEntity(req); System.out.println("executing request: " + httpput.getRequestLine());
response = httpclient.execute(httpput); HttpEntity re = response.getEntity();
System.out.println(response.getStatusLine());
if (re != null) {
System.out.println(
"Response content: " + new BufferedReader(new InputStreamReader(re.getContent())).readLine());
}
EntityUtils.consume(re);
} finally {
try {
response.close();
} catch (Exception e) {
e.printStackTrace();
}
}
} }

效果如下:

Message高级特性 & 内嵌Jetty实现文件服务器

git地址:https://github.com/qiao-zhi/JettyFileServer.git

内嵌Jetty的用法参考:http://wiki.eclipse.org/Jetty/Tutorial/Embedding_Jetty

2.生产者和消费者

 生产者

package cn.qlq.activemq.blob;

import java.io.File;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage; public class Producer { public static void main(String[] args) throws JMSException {
// 创建链接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=http://localhost:8080/fileserver/");
Connection connection = null;
ActiveMQSession session = null;
try {
// 创建链接
connection = factory.createConnection();
// 启动链接
connection.start();
// 获取会话
session = (ActiveMQSession) connection.createSession(Boolean.TRUE, session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination queue = session.createQueue("blobQueue");
// 创建生产者对象
MessageProducer messageProducer = session.createProducer(queue);
// 创建blob消息
BlobMessage blobMessage = session.createBlobMessage(new File("pom.xml"));
messageProducer.send(blobMessage);
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
} }
}

消费者代码:

package cn.qlq.activemq.blob;

import java.io.InputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage; public class Consumer {
/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException { // 获取 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 创建 Connection
Connection connection = connectionFactory.createConnection();
connection.start(); // 创建 Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建 Destinatione
Destination destination = session.createQueue("blobQueue"); // 创建 Consumer
MessageConsumer consumer = session.createConsumer(destination); //监听消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof BlobMessage) {
//监听BlobMessage
BlobMessage blobMessage = (BlobMessage) message;
try {
InputStream in = blobMessage.getInputStream();
byte[] bytes = new byte[in.available()];
in.read(bytes);
System.out.println(new String(bytes));
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
} }

4. Message Transformation 消息类型转换

  有时候需要JMS Producer内部进行message转换,从4.2版本起,ActiveMQ提供了一个Message Transform接口用于进行消息转换,也就是对消息的类型进行转换,可以在如下对象上调用:
    ActiveMQConnectionFactory,ActiveMQConnection,ActiveMQSession,ActiveMQMessageConsumer,ActiveMQMessageProducer.
  在消息被发送之前发送到JMS producer的消息总线前进行转换,通过producerTransform方法,在消息到达总线后,但是在consumer接收消息之前进行转换,通过consumerTransform方法,当然MessageTransfoemer接口的实现需要你自己来提供。

例如:下面的一个在生产的时候转换的例子

生产者:

package cn.qlq.activemq.topic;

import java.util.concurrent.CountDownLatch;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.MessageTransformer;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.leveldb.replicated.dto.Transfer; /**
* 主题模式的消息生产者
*
* @author QiaoLiQiang
* @time 2018年9月19日下午10:10:36
*/
public class MsgProducer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String topicName = "transTopic"; public static void main(String[] args) throws JMSException, InterruptedException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection(); // 3.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
Destination destination = session.createTopic(topicName);
// 5.创建生产者producer
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
// 6设置为持久模式(这个必须在下面开启connection之前)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7.启动connection
connection.start(); // 8.设置消息转换类型
producer.setTransformer(new MessageTransformer() {
// 生产者实现这个方法
@Override
public Message producerTransform(Session session, MessageProducer producer, Message message)
throws JMSException {
if (message instanceof TextMessage) {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("key", ((TextMessage) message).getText()); return mapMessage;
}
return message;
} // 消费者换实现这个方法
@Override
public Message consumerTransform(Session session, MessageConsumer consumer, Message message)
throws JMSException {
return null;
}
}); for (int i = 0; i < 3; i++) {
// 9.创建Message,有好多类型,这里用最简单的TextMessage
TextMessage tms = session.createTextMessage("textMessage:" + i);
// 10.生产者发送消息
producer.send(tms); System.out.println("send:" + tms.getText());
}
// 11.关闭connection
connection.close(); } }

消费者:

package cn.qlq.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 主题模式的消费消息
*
* @author QiaoLiQiang
* @time 2018年9月18日下午11:26:41
*/
public class MsgConsumer { // 默认端口61616
private static final String url = "tcp://localhost:61616/";
private static final String topicName = "transTopic"; public static void main(String[] args) throws JMSException {
// 1创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2.由connectionFactory创建connection
Connection connection = connectionFactory.createConnection();
// 设置链接的ID
// 3.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 4.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
Topic destination = session.createTopic(topicName);
// 5.启动connection
connection.start();
// 6.创建消费者consumer
MessageConsumer consumer = session.createConsumer(destination);
// 监听消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof MapMessage) {
try {
MapMessage message2 = (MapMessage) message;
System.out.println(message2.getString("key"));
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.out.println(message.getClass());
}
}
});
/* session.close();
connection.close();*/
} }

结果:

WARN | path isn't a valid local location for TcpTransport to use
textMessage:0
textMessage:1
textMessage:2

上一篇:Linux/Windows 应用程序开发


下一篇:mybatis generator maven插件自动生成代码