1.client的作用:
针对代码执行的过程出现错误日志,打印的logger.error()里的日志进行统一拦截,按照一定的规则封装成数据发送到服务端;
2.原理介绍
利用log4j2提供的扩张功能,主要是扩展log4j2的相关扩展点
@Plugin(name = "LogTracker", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
public class LogTrackerAppender extends AbstractAppender {
public static final int CCT_HOURS = 8;
public final String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
protected final Boolean enabled;
protected final String project;
protected final String senderClient;
protected final String sendUrl;
protected final String hostIp;
protected final Integer retries;
protected Integer totalSizeInBytes;
protected Integer maxBlockMs;
protected Integer ioThreadCount;
protected Integer batchSizeThresholdInBytes;
protected Integer batchCountThreshold;
protected Integer lingerMs;
protected Integer baseRetryBackoffMs;
protected Integer maxRetryBackoffMs;
protected LogSender sender;
private final ProducerConfig producerConfig = new ProducerConfig();
protected LogTrackerAppender(String name, Filter filter, Layout<? extends Serializable> layout,
String project,
String hostIp,
Integer retries,
String sendUrl,
String senderClient,
Integer totalSizeInBytes,
Integer maxBlockMs,
Integer ioThreadCount,
Integer batchSizeThresholdInBytes,
Integer batchCountThreshold,
Integer lingerMs,
Integer baseRetryBackoffMs,
Integer maxRetryBackoffMs,
Boolean enabled) {
super(name, filter, layout);
this.project = project;
this.hostIp = hostIp;
this.retries = retries;
this.sendUrl = sendUrl;
this.senderClient = senderClient;
this.totalSizeInBytes = totalSizeInBytes;
this.maxBlockMs = maxBlockMs;
this.ioThreadCount = ioThreadCount;
this.batchSizeThresholdInBytes = batchSizeThresholdInBytes;
this.batchCountThreshold = batchCountThreshold;
this.lingerMs = lingerMs;
this.baseRetryBackoffMs = baseRetryBackoffMs;
this.maxRetryBackoffMs = maxRetryBackoffMs;
this.enabled = enabled;
}
@Override
public void append(LogEvent event) {
if (enabled) {
// 获取错误信息
String throwableStr = event.getThrown() == null ? event.getMessage().getFormat() : getThrowableStr(event.getThrown());
// 获取traceId
String traceId = TraceContext.traceId();
long timeMillis = event.getTimeMillis();
LocalDateTime localDateTime = Instant.ofEpochMilli(timeMillis).atZone(ZoneOffset.ofHours(CCT_HOURS)).toLocalDateTime();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN);
String time = localDateTime.format(formatter);
LogTrackerRecord trackerRecord = new LogTrackerRecord();
trackerRecord.setLogLevel(event.getLevel().toString());
trackerRecord.setProject(project);
trackerRecord.setHostIp(hostIp);
trackerRecord.setTraceId(traceId);
trackerRecord.setClassName(event.getLoggerName());
if (event.getSource() != null) {
trackerRecord.setResourceName(event.getSource().toString());
}
trackerRecord.setMessage(throwableStr);
trackerRecord.setThreadName(event.getThreadName());
trackerRecord.setTime(time);
String s = JSONObject.toJSONString(trackerRecord, SerializerFeature.PrettyFormat);
LOGGER.error("succ trackerRecord:{}", s);
try {
sender.send(project, Collections.singletonList(trackerRecord));
} catch (Exception e) {
this.error(
"Failed to send log, project=" + project
+ ", logRecordList=" + Collections.singletonList(trackerRecord), e);
}
}
}
@Override
public void start() {
super.start();
if (batchCountThreshold != null && batchCountThreshold > 0 && batchCountThreshold <= ProducerConfig.MAX_BATCH_COUNT) {
producerConfig.setBatchCountThreshold(batchCountThreshold);
}
if (batchSizeThresholdInBytes != null && batchSizeThresholdInBytes > 0 && batchSizeThresholdInBytes <= ProducerConfig.MAX_BATCH_SIZE_IN_BYTES) {
producerConfig.setBatchSizeThresholdInBytes(batchSizeThresholdInBytes);
}
if (ioThreadCount != null && ioThreadCount > 0) {
producerConfig.setIoThreadCount(ioThreadCount);
}
if (baseRetryBackoffMs != null && baseRetryBackoffMs > 0) {
producerConfig.setBaseRetryBackoffMs(baseRetryBackoffMs);
}
if (retries != null) {
producerConfig.setRetries(retries);
}
if (lingerMs != null && lingerMs >= ProducerConfig.LINGER_MS_LOWER_LIMIT) {
producerConfig.setLingerMs(lingerMs);
}
if (maxBlockMs != null) {
producerConfig.setMaxBlockMs(maxBlockMs);
}
if (maxRetryBackoffMs != null && maxRetryBackoffMs > 0) {
producerConfig.setMaxRetryBackoffMs(maxRetryBackoffMs);
}
if (totalSizeInBytes != null && totalSizeInBytes > 0) {
producerConfig.setTotalSizeInBytes(totalSizeInBytes);
}
sender = new LogSender(producerConfig);
sender.putProjectConfig(project, senderClient, sendUrl);
}
@Override
public void stop() {
super.stop();
if (sender != null) {
try {
sender.close();
} catch (Exception e) {
this.error("Failed to close LoghubAppender.", e);
}
}
}
/**
* 获取异常信息
*
* @param throwable Throwable对象
* @return 异常信息
*/
private String getThrowableStr(Throwable throwable) {
if (throwable == null) {
return null;
}
StringBuilder sb = new StringBuilder();
boolean isFirst = true;
for (String s : Throwables.toStringList(throwable)) {
if (isFirst) {
isFirst = false;
} else {
sb.append(System.getProperty("line.separator"));
}
sb.append(s);
}
return sb.toString();
}
@PluginBuilderFactory
public static <B extends LogTrackerAppenderBuilder<B>> B newBuilder() {
return new LogTrackerAppenderBuilder<B>().asBuilder();
}
}
- 当收集到error log会触发 LogTrackerAppender
- 通过 LogSender 发送数据,数据会被加入到 LogAccumulator 中的某个 ProducerBatch 里
-
在调用 send 方法过程中,如果发现目标 ProducerBatch 包含的日志条数到达了 maxBatchCount 或该 ProducerBatch 剩余的空间无法容纳当前数据,则会首先将该 ProducerBatch 投递到 IOThreadPool 里,然后再新建一个 ProducerBatch 存放当前数据
-
Mover 会遍历 LogAccumulator 中的每个 ProducerBatch,把超过了缓存时间的 batch 加入 expiredBatches 里。同时会记录未过期 batch 的最近超时时间,记为 t
-
将从 LogAccumulator 中获取的 expiredBatches 投递到 IOThreadPool 里
-
获取 RetryQueue 中所有满足发送条件的 ProducerBatch,如果当前没有 batch 满足发送条件则最多等待时间 t
-
将从 RetryQueue 中获取的 expiredBatches 投递到 IOThreadPool 里。(Mover 完成步骤 7 后会再次进入步骤 4)
-
IOThreadPool 中的工作线程从阻塞队列里获取 ProducerBatch,然后发送给目标 LogTracker Server
-
如果数据发送失败,且满足下列条件,会计算当前 ProducerBatch 的下次计划发送时间,然后将其放入将该 ProducerBatch 写入重试队列。
-
该错误可以重试。
-
RetryQueue 没有关闭。
-
-
LogAccumulator
为了提高吞吐量,一个常见的做法是将若干个小包合并成大包批量发送, LogAccumulator 的主要作用便是合并待发送的数据。LogAccumulator 会根据project 属性将其缓存到内部 map 的不同位置。这个 map 的 key 为上述project属性(后序可能会扩展),value 为 ProducerBatch。为了保证线程安全同时支持高并发,这里选用 ConcurrentMap 作为 map 的实现。
- LogAccumulator 的另一个作用是控制缓存数据的总大小,这里选用 Semaphore 实现控制逻辑。Semaphore 是基于 AQS 实现的高性能同步工具,它会首先尝试通过自旋的方式获取共享资源,减少线程上下文切换的开销。
-
RetryQueue
RetryQueue 用于存放发送失败待重试的 ProducerBatch,每个 batch 有一个字段用于标识下次计划发送时间。为了高效地获取超时 batch,内部选用 DelayQueue 存放这些 batch。DelayQueue 是一种按时间排序的优先队列,最先超时的 batch 会被优先取出,同时它也是线程安全的。
-
Mover
Mover 是一个独立的线程,它会循环地将 LogAccumulator 和 RetryQueue 中的超时 batch 投递到 IOThreadPool 里。为了避免空转占用宝贵的 CPU 资源,当 Mover 发现 LogAccumulator 和 RetryQueue 里没有满足发送条件的 batch 时,会在 RetryQueue 的 expiredBatches 方法上等待用户配置的数据最长缓存时间 lingerMs。
-
IOThreadPool
IOThreadPool 中的工作线程用于真正执行数据发送任务,该线程池的大小可通过参数 ioThreadCount 指定,默认为可用处理器个数乘以 2。
-
SendProducerBatchTask
SendProducerBatchTask 封装了 batch 发送逻辑。为了避免阻塞 IO 线程,不论当前 batch 发送成功与否都会将其投递到队列中交由独立线程去执行回调。另外,如果某个发送失败的 batch 满足重试条件,不会在当前 IO 线程中立即重试(立即重试通常也会失败),而是根据指数退避策略将其投递到 RetryQueue 中。
-
BatchHandler
Producer 会启动一个 SuccessBatchHandler 和一个 FailureBatchHandler 分别用来处理发送成功或失败的 batch。Handler 在执行完 batch 的 callback、设置好 batch 的 future 后便会“释放”该 batch 占用的内存空间,供新的数据使用。分开处理的原因是为了隔离发送成功和发送失败的 batch,保持 producer 整体的流动性。