我的团队和我正在创建一个由一组RESTful JSON服务组成的服务平台,该平台中的每个服务在平台中的作用就是分别提供一些独特的功能和/或数据。由于平台中产生的日志四散各处,所以我们想,要是能将这些日志集中化处理一下,并提供一个能够让我们查看、过滤、排序和搜索我们所有的日志的基本型的日志查看工具就好了。我们还想让我们的日志是异步式的,因为我们可不想在写日志的时候(比方说,可能会将日志直接写入数据库),让我们提供的服务因为写日志而暂时被阻挡住。
实现这个目标的策略非常简单明了。
- 安装ActiveMQ
- 创建一个log4j的日志追加器,将日志写入队列(log4j自带了一个这样的追加器,不过现在让我们自己来写一个吧。)
- 写一个消息侦听器,从MQ服务器上所设置的JMS队列中读取日志并将日志持久化
下面让我们分步来看这个策略是如何得以实现的。
安装ActiveMQ
创建一个Lo4j的JMS日志追加器
首先,我们来创建一个log4j的JMS日志追加器。log4j自带了一个这样的追加器(该追加器没有将日志写入一个队列,而是写给了一个话题)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
import
import
import
import
import
import
import
import
import
import
import
/**
*
*
*
*/
public
extends
implements
private
"JMSQueueAppender" );
private
private
@Override
public
}
@Override
public
return
;
}
@Override
protected
try
ActiveMQConnectionFactory new
this .brokerUri);
//
javax.jms.Connection
connection.start();np
//
Session false ,Session.AUTO_ACKNOWLEDGE);
//
Destination this .queueName);
//
MessageProducer
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
ObjectMessage new
//
producer.send(message);
//
session.close();
connection.close();
} catch
e.printStackTrace();
}
}
public
this .brokerUri
}
public
return
}
public
this .queueName
}
public
return
}
}
|
下面让我们看看这里面发生了什么事情。
第19行:We我们实现了的Log4J日志追加器接口,该接口要求我们实现三个方法:requiresLayout, close和append。我们将暂时简化处理过程,实现所需的append方法。在对logger进行调用时这个方法就会被调用。
第37行: log4j将一个LoggingEvent对象作为参数对append方法进行调用,这个LoggingEvent对象表示了对logger的一次调用,它封装了每一个日志项的所有信息。
第41和42行:将指向JMS的uri作为参数,创建一个连接工厂对象,在我们的情况下,该uri指向的是我们的ActiveMQ服务器。
第45, 46和49行: 我们同JMS服务器建立一个连接和会话。会话有多种打开模式。在Auto_Acknowledge模式的会话中,消息的应答会自动发生。Client_Acknowledge 模式下,客户端需要对消息的接收和/或处理进行显式地应答。另外还有两种其它的模式。有关细节,请参考文档http://download.oracle.com/javaee/1.4/api/javax/jms/Session.html
第52行: 创建一个队列。将队列的名字作为参数发送给连接
第56行: 我们将发送模式设置为Non_Persistent。另一个可选的模式是Persistent ,在这种模式下,消息会持久化到一个持久性存储系统中。持久化模式会降低系统速度,但能增加了消息传递的可靠性。
第58行: 这行我们做了很多事。首先我将一个LoggingEvent对象封装到了一个LoggingEventWrapper对象之中。这么做是因为LoggingEvent对象有一些属性不支持序列化,另外还有一个原因是我想记录一些额外的信息,比如IP地址和主机名。接下来,使用JMS的会话对象,我们把一个对象(LoggingEventWrapper对象)做好了发送前的准备。
第61行: 我将该对象发送到了队列中。
下面所示是LoggingEventWrapper的代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
import
import
import
import
import
/**
*
*
*
*
*
*
*/
public
implements
private
"%throwable" ;
private
private
private
private
private
private
private
private
private
public
this .loggingEvent
//Format
EnhancedPatternLayout new
layout.setConversionPattern(ENHANCED_PATTERN_LAYOUT);
this .detail this .loggingEvent);
}
public
return
.loggingEvent.timeStamp;
}
public
return
.loggingEvent.getLevel().toString();
}
public
return
.loggingEvent.getLoggerName();
}
public
return
.loggingEvent.getRenderedMessage();
}
public
return
.detail;
}
public
return
}
public
try
return
} catch
return
;
}
}
public
try
return
} catch
return
;
}
}
}
|
消息侦听器
消息侦听器会对队列(或话题)进行“侦听”。一旦有新消息添加到了队列中,onMessage 方法就会得到调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import
import
import
import
import
import
import
@Component
public
implements
{
public
class );
@Autowired
private
public
final
{
if
instanceof
{
try {
final
loggingService.saveLog(loggingEventWrapper);
}
catch
final
{
logger.error(e.getMessage(),
} catch
logger.error(e.getMessage(),e);
}
}
}
}
|
第23行: 检查从队列中拿到的对象是否是ObjectMessage的实例
第26行: 从消息中提取出LoggingEventWrapper对象
第27行: 调用服务方法将日志持久化
Spring配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
<? xml
= "1.0"
= "UTF-8" ?>
< beans
= "http://www.springframework.org/schema/beans"
= "http://www.w3.org/2001/XMLSchema-instance"
= "http://www.springframework.org/schema/p"
= "http://www.springframework.org/schema/context"
= "http://www.springframework.org/schema/jms"
= "http://activemq.apache.org/schema/core"
= "http://www.springframework.org/schema/aop"
= "http://www.springframework.org/schema/beans >
<!--
<!--
<!--
<amq:transportConnectors>
<amq:transportConnector
</amq:transportConnectors>
</amq:broker-->
<!--
< amq:queue
= "destination"
= "logQueue"
<!--
< amq:connectionFactory
= "jmsFactory"
= "tcp://localhost:61616"
< bean
= "connectionFactory"
= "org.springframework.jms.connection.CachingConnectionFactory" >
< constructor-arg
= "jmsFactory"
< property
= "exceptionListener"
= "JMSExceptionListener"
< property
= "sessionCacheSize"
= "100"
</ bean >
<!--
< bean
= "jmsTemplate"
= "org.springframework.jms.core.JmsTemplate" >
< constructor-arg
= "connectionFactory"
</ bean >
<!--
is
< jms:listener-container
= "10" >
< jms:listener
= "QueueListener"
= "logQueue"
= "logQueueListener"
</ jms:listener-container >
</ beans >
|
第5到9行: 使用代理标签建立一个嵌入式消息代理。既然我用的是外部消息代理,所以我就不需要它了。
第12行: 给出你想要连接的队列的名字
第14行: 代理服务器的URI
第15到19行: 连接工厂的设置
第26到28行: 消息侦听器的设置,这里可以指定用于从队列中读取消息的并发现线程的个数
当然,上面的例子做不到让你能够拿来就用。你还需要包含所有的JMS依赖库并实现完成日志持久化任务的服务。但是,我希望本文能够为你提供一个相当不错的思路。