logTracker错误日志跟踪组件(二)

1.client的作用:

针对代码执行的过程出现错误日志,打印的logger.error()里的日志进行统一拦截,按照一定的规则封装成数据发送到服务端;

2.原理介绍

利用log4j2提供的扩张功能,主要是扩展log4j2的相关扩展点

@Plugin(name = "LogTracker", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
public class LogTrackerAppender extends AbstractAppender {

    public static final int CCT_HOURS = 8;

    public final String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";

    protected final Boolean enabled;

    protected final String project;

    protected final String senderClient;

    protected final String sendUrl;

    protected final String hostIp;

    protected final Integer retries;

    protected Integer totalSizeInBytes;

    protected Integer maxBlockMs;

    protected Integer ioThreadCount;

    protected Integer batchSizeThresholdInBytes;

    protected Integer batchCountThreshold;

    protected Integer lingerMs;

    protected Integer baseRetryBackoffMs;

    protected Integer maxRetryBackoffMs;


    protected LogSender sender;


    private final ProducerConfig producerConfig = new ProducerConfig();


    protected LogTrackerAppender(String name, Filter filter, Layout<? extends Serializable> layout,
                                 String project,
                                 String hostIp,
                                 Integer retries,
                                 String sendUrl,
                                 String senderClient,
                                 Integer totalSizeInBytes,
                                 Integer maxBlockMs,
                                 Integer ioThreadCount,
                                 Integer batchSizeThresholdInBytes,
                                 Integer batchCountThreshold,
                                 Integer lingerMs,
                                 Integer baseRetryBackoffMs,
                                 Integer maxRetryBackoffMs,
                                 Boolean enabled) {
        super(name, filter, layout);
        this.project = project;
        this.hostIp = hostIp;
        this.retries = retries;
        this.sendUrl = sendUrl;
        this.senderClient = senderClient;
        this.totalSizeInBytes = totalSizeInBytes;
        this.maxBlockMs = maxBlockMs;
        this.ioThreadCount = ioThreadCount;
        this.batchSizeThresholdInBytes = batchSizeThresholdInBytes;
        this.batchCountThreshold = batchCountThreshold;
        this.lingerMs = lingerMs;
        this.baseRetryBackoffMs = baseRetryBackoffMs;
        this.maxRetryBackoffMs = maxRetryBackoffMs;
        this.enabled = enabled;
    }

    @Override
    public void append(LogEvent event) {
        if (enabled) {
            // 获取错误信息
            String throwableStr = event.getThrown() == null ? event.getMessage().getFormat() : getThrowableStr(event.getThrown());

            // 获取traceId
            String traceId = TraceContext.traceId();

            long timeMillis = event.getTimeMillis();
            LocalDateTime localDateTime = Instant.ofEpochMilli(timeMillis).atZone(ZoneOffset.ofHours(CCT_HOURS)).toLocalDateTime();
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN);
            String time = localDateTime.format(formatter);

            LogTrackerRecord trackerRecord = new LogTrackerRecord();
            trackerRecord.setLogLevel(event.getLevel().toString());
            trackerRecord.setProject(project);
            trackerRecord.setHostIp(hostIp);
            trackerRecord.setTraceId(traceId);

            trackerRecord.setClassName(event.getLoggerName());

            if (event.getSource() != null) {
                trackerRecord.setResourceName(event.getSource().toString());
            }

            trackerRecord.setMessage(throwableStr);
            trackerRecord.setThreadName(event.getThreadName());
            trackerRecord.setTime(time);

            String s = JSONObject.toJSONString(trackerRecord, SerializerFeature.PrettyFormat);
            LOGGER.error("succ  trackerRecord:{}", s);

            try {
                sender.send(project, Collections.singletonList(trackerRecord));
            } catch (Exception e) {
                this.error(
                    "Failed to send log, project=" + project
                        + ", logRecordList=" + Collections.singletonList(trackerRecord), e);
            }
        }
    }


    @Override
    public void start() {
        super.start();

        if (batchCountThreshold != null && batchCountThreshold > 0 && batchCountThreshold <= ProducerConfig.MAX_BATCH_COUNT) {
            producerConfig.setBatchCountThreshold(batchCountThreshold);
        }

        if (batchSizeThresholdInBytes != null && batchSizeThresholdInBytes > 0 && batchSizeThresholdInBytes <= ProducerConfig.MAX_BATCH_SIZE_IN_BYTES) {
            producerConfig.setBatchSizeThresholdInBytes(batchSizeThresholdInBytes);
        }

        if (ioThreadCount != null && ioThreadCount > 0) {
            producerConfig.setIoThreadCount(ioThreadCount);
        }

        if (baseRetryBackoffMs != null && baseRetryBackoffMs > 0) {
            producerConfig.setBaseRetryBackoffMs(baseRetryBackoffMs);
        }

        if (retries != null) {
            producerConfig.setRetries(retries);
        }

        if (lingerMs != null && lingerMs >= ProducerConfig.LINGER_MS_LOWER_LIMIT) {
            producerConfig.setLingerMs(lingerMs);
        }

        if (maxBlockMs != null) {
            producerConfig.setMaxBlockMs(maxBlockMs);
        }

        if (maxRetryBackoffMs != null && maxRetryBackoffMs > 0) {
            producerConfig.setMaxRetryBackoffMs(maxRetryBackoffMs);
        }

        if (totalSizeInBytes != null && totalSizeInBytes > 0) {
            producerConfig.setTotalSizeInBytes(totalSizeInBytes);
        }

        sender = new LogSender(producerConfig);

        sender.putProjectConfig(project, senderClient, sendUrl);
    }


    @Override
    public void stop() {
        super.stop();
        if (sender != null) {
            try {
                sender.close();
            } catch (Exception e) {
                this.error("Failed to close LoghubAppender.", e);
            }
        }
    }


    /**
     * 获取异常信息
     *
     * @param throwable Throwable对象
     * @return 异常信息
     */
    private String getThrowableStr(Throwable throwable) {
        if (throwable == null) {
            return null;
        }
        StringBuilder sb = new StringBuilder();
        boolean isFirst = true;
        for (String s : Throwables.toStringList(throwable)) {
            if (isFirst) {
                isFirst = false;
            } else {
                sb.append(System.getProperty("line.separator"));
            }
            sb.append(s);
        }
        return sb.toString();
    }


    @PluginBuilderFactory
    public static <B extends LogTrackerAppenderBuilder<B>> B newBuilder() {
        return new LogTrackerAppenderBuilder<B>().asBuilder();
    }

}

logTracker错误日志跟踪组件(二)

  1. 当收集到error log会触发 LogTrackerAppender
  2. 通过 LogSender 发送数据,数据会被加入到 LogAccumulator 中的某个 ProducerBatch 里
  3. 在调用 send 方法过程中,如果发现目标 ProducerBatch 包含的日志条数到达了 maxBatchCount 或该 ProducerBatch 剩余的空间无法容纳当前数据,则会首先将该 ProducerBatch 投递到 IOThreadPool 里,然后再新建一个 ProducerBatch 存放当前数据

  4. Mover 会遍历 LogAccumulator 中的每个 ProducerBatch,把超过了缓存时间的 batch 加入 expiredBatches 里。同时会记录未过期 batch 的最近超时时间,记为 t

  5. 将从 LogAccumulator 中获取的 expiredBatches 投递到 IOThreadPool 里

  6. 获取 RetryQueue 中所有满足发送条件的 ProducerBatch,如果当前没有 batch 满足发送条件则最多等待时间 t

  7. 将从 RetryQueue 中获取的 expiredBatches 投递到 IOThreadPool 里。(Mover 完成步骤 7 后会再次进入步骤 4)

  8. IOThreadPool 中的工作线程从阻塞队列里获取 ProducerBatch,然后发送给目标 LogTracker Server

  9. 如果数据发送失败,且满足下列条件,会计算当前 ProducerBatch 的下次计划发送时间,然后将其放入将该 ProducerBatch 写入重试队列。

    1. 该错误可以重试。

    2. RetryQueue 没有关闭。

  • LogAccumulator

为了提高吞吐量,一个常见的做法是将若干个小包合并成大包批量发送, LogAccumulator 的主要作用便是合并待发送的数据。LogAccumulator 会根据project 属性将其缓存到内部 map 的不同位置。这个 map 的 key 为上述project属性(后序可能会扩展),value 为 ProducerBatch。为了保证线程安全同时支持高并发,这里选用 ConcurrentMap 作为 map 的实现。

  • LogAccumulator 的另一个作用是控制缓存数据的总大小,这里选用 Semaphore 实现控制逻辑。Semaphore 是基于 AQS 实现的高性能同步工具,它会首先尝试通过自旋的方式获取共享资源,减少线程上下文切换的开销。
  • RetryQueue

RetryQueue 用于存放发送失败待重试的 ProducerBatch,每个 batch 有一个字段用于标识下次计划发送时间。为了高效地获取超时 batch,内部选用 DelayQueue 存放这些 batch。DelayQueue 是一种按时间排序的优先队列,最先超时的 batch 会被优先取出,同时它也是线程安全的。

  • Mover

Mover 是一个独立的线程,它会循环地将 LogAccumulator 和 RetryQueue 中的超时 batch 投递到 IOThreadPool 里。为了避免空转占用宝贵的 CPU 资源,当 Mover 发现 LogAccumulator 和 RetryQueue 里没有满足发送条件的 batch 时,会在 RetryQueue 的 expiredBatches 方法上等待用户配置的数据最长缓存时间 lingerMs。

  • IOThreadPool

IOThreadPool 中的工作线程用于真正执行数据发送任务,该线程池的大小可通过参数 ioThreadCount 指定,默认为可用处理器个数乘以 2。

  • SendProducerBatchTask

SendProducerBatchTask 封装了 batch 发送逻辑。为了避免阻塞 IO 线程,不论当前 batch 发送成功与否都会将其投递到队列中交由独立线程去执行回调。另外,如果某个发送失败的 batch 满足重试条件,不会在当前 IO 线程中立即重试(立即重试通常也会失败),而是根据指数退避策略将其投递到 RetryQueue 中。

  • BatchHandler

Producer 会启动一个 SuccessBatchHandler 和一个 FailureBatchHandler 分别用来处理发送成功或失败的 batch。Handler 在执行完 batch 的 callback、设置好 batch 的 future 后便会“释放”该 batch 占用的内存空间,供新的数据使用。分开处理的原因是为了隔离发送成功和发送失败的 batch,保持 producer 整体的流动性。

上一篇:.NET架构小技巧(2)——访问修饰符正确姿势


下一篇:观察者模式