Aliyun LOG Java Producer 快速入门

背景

Aliyun LOG Java Producer 是为运行在大数据、高并发场景下的 Java 应用量身打造的高性能写 LogHub 类库。相对于原始的 API 或 SDK,使用 producer 写数据能为您带来诸多优势,包括高性能、计算与 I/O 逻辑分离、资源可控制等。想要更深入地了解 producer 的功能和原理可参考文章日志上云利器 - Aliyun LOG Java Producer,本文将聚焦于 producer 的使用上。

使用步骤

使用 producer 可归结为如下图所示的三个步骤,下面分别为您介绍。
Aliyun LOG Java Producer 快速入门

创建 producer

Producer 的创建过程主要涉及到以下对象。

ProjectConfig

ProjectConfig 包含目标 project 的服务入口信息以及表征调用者身份的访问凭证。

服务入口

最终访问地址会由 project 和 endpoint 拼出,关于如何确定 project 对应的 endpoint 可参考文章服务入口

访问凭证

Producer 支持用户配置 AK 或 STS token。如果使用 STS token,需要定期创建新的 ProjectConfig 然后将其 put 到 ProjectConfigs 里。

ProjectConfigs

如果您需要向不同的 project 中写入数据,可以创建多个 ProjectConfig 对象然后将它们 put 到 ProjectConfigs 里。ProjectConfigs 内部通过 map 维护不同 project 的配置,该 map 的 key 为 project,value 为该 project 对应的 Client。

ProducerConfig

ProducerConfig 用于配置发送策略,您可以根据不同的业务场景为参数指定不同的值,各参数含义如下表所示。

参数 类型 描述
totalSizeInBytes 整型 单个 producer 实例能缓存的日志大小上限,默认为 100MB。
maxBlockMs 整型 如果 producer 可用空间不足,调用者在 send 方法上的最大阻塞时间,默认为 60 秒。
如果超过这个时间后所需空间仍无法得到满足,send 方法会抛出 TimeoutException
如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。
如果您希望 send 方法一直阻塞直到所需空间得到满足,可将该值设为负数。
ioThreadCount 整型 执行日志发送任务的线程池大小,默认为可用处理器个数。
batchSizeThresholdInBytes 整型 当一个 ProducerBatch 中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该 batch 将被发送,默认为 512 KB,最大可设置成 5MB。
batchCountThreshold 整型 当一个 ProducerBatch 中缓存的日志条数大于等于 batchCountThreshold 时,该 batch 将被发送,默认为 4096,最大可设置成 40960。
lingerMs 整型 一个 ProducerBatch 从创建到可发送的逗留时间,默认为 2 秒,最小可设置成 100 毫秒。
retries 整型 如果某个 ProducerBatch 首次发送失败,能够对其重试的次数,默认为 10 次。
如果 retries 小于等于 0,该 ProducerBatch 首次发送失败后将直接进入失败队列。
maxReservedAttempts 整型 每个 ProducerBatch 每次被尝试发送都对应着一个 Attempt,此参数用来控制返回给用户的 attempt 个数,默认只保留最近的 11 次 attempt 信息。
该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。
baseRetryBackoffMs 整型 首次重试的退避时间,默认为 100 毫秒。
Producer 采样指数退避算法,第 N 次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。
maxRetryBackoffMs 整型 重试的最大退避时间,默认为 50 秒。
adjustShardHash 布尔 如果调用 send 方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为 true。
buckets 整型 当且仅当 adjustShardHash 为 true 时,该参数才生效。此时,producer 会自动将 shardHash 重新分组,分组数量为 buckets。
如果两条数据的 shardHash 不同,它们是无法合并到一起发送的,会降低 producer 吞吐量。将 shardHash 重新分组后,能让数据有更多地机会被批量发送。
该参数的取值范围是 [1, 256],且必须是 2 的整数次幂,默认为 64。

LogProducer

LogProducer 是接口 Producer 的实现类,它接收唯一的参数 producerConfig。当您准备好 producerConfig 后,可以按照下列方式创建 producer 实例。

Producer producer = new LogProducer(producerConfig);

创建 producer 的同时会创建一系列线程,是一个相对昂贵的操作,因此建议一个应用共用一个 producer 实例。一个 producer 实例包含的线程如下表所示,其中 N 为该 producer 实例在当前进程中的编号,从 0 开始。

线程名格式 数量 描述
aliyun-log-producer-<N>-mover 1 负责将满足发送条件的 batch 投递到发送线程池里。
aliyun-log-producer-<N>-io-thread- ioThreadCount IOThreadPool 中真正用于执行数据发送任务的线程。
aliyun-log-producer-<N>-success-batch-handler 1 用于处理发送成功的 batch。
aliyun-log-producer-<N>-failure-batch-handler 1 用于处理发送失败的 batch。

另外,LogProducer 提供的所有方法都是线程安全的,可以在多线程环境下安全执行。

发送数据

Producer 实例创建好后,下一步就是使用其提供的方法发送数据。

参数介绍

Producer 接口提供多种发送方法,方法的各个参数含义如下。

参数 描述 能否为空
project 待发送数据的目标 project。 不能
logStore 待发送数据的目标 logStore。 不能
logItem 待发送数据。 不能
topic 待发送数据的 topic。
如果留空或没有指定,该字段将被赋予""。
source 待发送数据的 source。
如果留空或没有指定,该字段将被赋予 producer 所在宿主机的 IP。
shardHash 待发送数据的 shardHash,用于将数据写入 logStore 中的特定 shard。
如果留空或没有指定,数据将被随机写入目标 logStore 的某个 shard 中。
callback 用于告知调用者数据发送的结果。

另外,只有 project、logStore、topic、source、shardHash 这 5 个属性都相同的数据才有机会和并在一起批量发送。为了让数据合并功能充分发挥作用,同时也为了节省内存,建议您控制这 5 个字段的取值范围。如果某个字段如 topic 的取值确实非常多,建议您将其加入 logItem 而不是直接使用 topic。

获取发送结果

由于 producer 提供的所有发送方法都是异步的,需要通过返回的 future 或者传入的 callback 获取发送结果。

Future

Send 方法会返回一个 ListenableFuture,它除了可以像普通 future 那样通过调用 get 方法阻塞获得发送结果外,还允许你注册回调方法(回调方法会在完成 future 设置后被调用)。以下代码片段展示了 ListenableFuture 的使用方法,用户需要为该 future 注册一个 FutureCallback 并将其投递到应用提供的线程池 EXECUTOR_SERVICE 中执行,完整样例可参考 SampleProducerWithFuture.java

ListenableFuture<Result> f = producer.send("project", "logStore", logItem);
Futures.addCallback(f,
                    new FutureCallback<Result>() {
                        @Override
                        public void onSuccess(@Nullable Result result) {
                        }

                        @Override
                        public void onFailure(Throwable t) {
                        }
                    },
                    EXECUTOR_SERVICE);

Callback

除了使用 future 外,您还可以通过在调用 send 方法时注册 callback 获取数据发送结果,代码片段如下。(完整样例可参考 SampleProducerWithCallback.java

producer.send(
        "project",
        "logStore",
        logItem,
        new Callback() {
            @Override
            public void onCompletion(Result result) {
            }
        });

Callback 由 producer 内部线程负责执行,并且只有在执行完毕后数据“占用”的空间才会释放。为了不阻塞 producer 造成整体吞吐量的下降,要避免在 callback 里执行耗时的操作。另外,在 callback 中调用 send 方法进行重试也是不建议的,您可以上调参数 retries 或者在 ListenableFuture 的callback 中进行重试。

Future vs Callback

那么想要获取数据的发送结果到底是选择 future 还是 callback 呢?如果获取结果后,应用的处理逻辑比较简单且不会造成 producer 阻塞,建议直接使用 callback。否则,建议使用 ListenableFuture,在单独的线程(池)中执行后续业务。

关闭 producer

当您已经没有数据需要发送或者当前进程准备退出时,需要关闭 producer,目的是让 producer 中缓存的数据全部得到处理。目前,producer 提供安全关闭和有限关闭两种模式。

安全关闭

在大多数情况下,建议您使用安全关闭。安全关闭对应的方法是close(),它会等到 producer 中缓存的数据全部被处理、线程全部停止、注册的 callback 全部执行,返回 future 全部被设置后才会返回。

虽然要等到数据全部处理完成,但 producer 被关闭后,缓存的 batch 会被立刻处理且不会被重试。因此,如果 callback 不被阻塞,close 方法往往能在很短的时间内返回。

有限关闭

如果您的 callback 在执行过程中有可能阻塞,但您又希望 close 方法能在短时间内返回,可以使用有限关闭。有限关闭对应的方法是close(long timeoutMs),如果超过指定的 timeoutMs 后 producer 仍未完全关闭,它会抛出 IllegalStateException,这意味着缓存的数据可能还没来得及处理就被丢弃,用户注册的 callback 也可能不会被执行。

应用示例

为了进一步减少学习成本,我们为您准备了 Aliyun LOG Java Producer 应用示例。示例中包含了 producer 从创建到关闭的全部流程。

技术支持

Aliyun LOG Java Producer 快速入门

上一篇:Android利用Logcat监听应用程序本身被卸载


下一篇:C#中TreeView组件使用方法初步