背景
aliyun-go-producer为运行在大数据、高并发场景下的 Java 应用量身打造的高性能写 LogHub 类库。相对于原始的 API 或 SDK,使用 producer 写数据能为您带来诸多优势,包括高性能、计算与 I/O 逻辑分离、资源可控制等。本文将为大家带来go producer的设计原理。
功能特点
- 线程安全 - producer 内所有的方法以及暴露的接口都是线程安全的。
- 异步发送 - 调用send方法后回立即返回,日志将会被传递到io线程中异步发送,不阻塞用户发送日志操作。
- 失败重试 - 用户可以通过设置初始化的参数Retries来指定日志发送失败的次数,超过重试次数将被投递到失败队列。
- 优雅关闭 - 用户调用关闭方法进行关闭时,producer 会将所有其缓存的数据进行发送,防止日志丢失,关闭分为有限关闭和安全关闭,详细的区别会在下文中列出。
- 本地调试 - 可通过配置支持将日志内容输出到本地或控制台,并支持轮转、日志数、轮转大小设置。
- 高性能 - go版本的producer 基于go 语言特性进行开发,go的goroutine在并发多任务处理能力上有着与生俱来的优势。所以producer 对每一个可发送的任务会开启单独的groutine去执行发送任务,相对比直接使用cpu线程处理,对系统性能消耗更小,效率更高。
- 使用简单 - 在整个使用过程中,producer给提供了3个方法,start,send和close,用户启动producer 以后只需要调用send方法即可发送日志,producer 提供不同的send 的方法,用来满足用户的发送需求。
- 结果可控制 - 用户可以自己实现producer 提供的CallBack 接口,里面包含日志发送成功和失败后调用的方法,可以自行在CallBack接口写日志发送结果处理逻辑。
功能优势
使用 producer 相对于直接通过 API 或 SDK 向 LogHub 写数据会有如下优势。
高性能
在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer 实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。
异步非阻塞
在可用内存充足的前提下,producer 会对发往 LogHub 的数据进行缓存,因此用户调用 send 方法时能够立即返回,不会阻塞,达到计算与 I/O 逻辑分离的目的。稍后,用户可以通过返回的 future 对象或传入的 callback 获得数据发送的结果。
资源可控制
可以通过参数控制 producer 用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样做一方面避免了 producer 无限制地消耗资源,另一方面可以让您根据实际情况平衡资源消耗和写入吞吐量。
小结
综上所述,producer 在给您带来诸多优势的同时只暴露了简单的接口,为您屏蔽了复杂的底层细节;另外,您也无须担心它会影响到上层业务的正常运行,大大降低了数据接入门槛。
快速入门
Producer 的实现比较复杂,但使用起来却非常简单。想了解 producer 的正确打开方式请参考文章
原理剖析
为了让您更好地理解 producer 的表现行为,本章将带您探究它的实现原理,包括数据写入逻辑、核心组件的实现方式以及如何优雅地关闭 producer 中的各个组件。Producer 的整体架构如下图所示。
1.用户调用producer.Send()方法发送数据,数据会被加到LogAccumulator中的某个ProducerBatch里,通常情况下send 方法会立刻返回,但如果该 producer 实例没有足够空间容纳当前数据,此方法会被阻塞直到下列任意一个条件被满足。
i.之前缓存的数据被 BatchHandler 处理完成后,占用的内存被“释放”,producer 有足够空间容纳当前数据。
ii.到达用户指定的最长阻塞时间,此时会抛出异常。
2.在调用 send 方法过程中,如果发现目标 ProducerBatch 包含的日志条数到达了 maxBatchCount 或该 ProducerBatch 剩余的空间无法容纳当前数据,则会首先将该 ProducerBatch 投递到TaskQueue 里,然后再新建一个 ProducerBatch 存放当前数据。为了不阻塞用户线程,TaskQueue 使用切片,因为单个 Producer 实例能缓存的日志总大小是有限的,该切片长度不会无限增长。
3.Mover 会遍历 LogAccumulator 中的每个 ProducerBatch,把超过了缓存时间的 batch 加入 expiredBatches 里。同时会记录未过期 batch 的最近超时时间,记为 t。
4.将从 LogAccumulator 中获取的 expiredBatches 投递到TaskQueue 里。并且获取 RetryQueue 中所有满足发送条件的 ProducerBatch,将获取到的ProducerBatch 投递到TaskQueue 如果当前没有 batch 满足发送条件则最多等待时间 t。
5.IoThreadPool 会不断遍历TaskQueue队列中所有待发送的任务Batch,并且对每一个任务开启一个单独groutine去执行发送任务,开启groutine的数量可以通过MaxIoWorkerCount参数去设置,当TaskQueue中任务的数量大于可以开启的groutine数量,IoThreadPool便会阻塞直到开出去的groutine完成任务关闭,但是这个过程不会影响TaskQueue新增任务,TaskQueue是无阻塞队列,所有不会阻塞用户调用producer.Send()方法操作。
6.IOThreadPool 中的工作线程从阻塞队列里或取 ProducerBatch,然后发送给目标 logStore。
7.如果数据发送成功,会执行该ProducerBatch 的成功回调函数。
8.如果数据发送失败,且满足下列任意一个条件,会执行将该 ProducerBatch的失败回调函数。
i. 该错误无法重试。
ii. RetryQueue 被关闭。
iii. 达到了指定的重试次数且失败队列中的 batch 数不超过待发送 batch 总数的二分之一。
9.否则,计算当前 ProducerBatch 的下次计划发送时间然后将其放入 RetryQueue 中。
核心组件
LogAccumulator
为了提高吞吐量,一个常见的做法是将若干个小包合并成大包批量发送,本小节介绍的 LogAccumulator 的主要作用便是合并待发送的数据。由于服务端要求具有相同 project、logstore、topic、source、shardHash 的数据才能组装成一个大包,LogAccumulator 会根据数据的这些属性将其缓存到内部 map 的不同位置。这个 map 的 key 为上述五元组,value 为 ProducerBatch。为了保证线程安全同时支持高并发,这里选用golang语言官方提供的SyncMapb包 作为 map 的实现。
LogAccumulator 的另一个作用是控制缓存数据的总大小,这里选用 Semaphore 实现控制逻辑。Semaphore 是基于 AQS 实现的高性能同步工具,它会首先尝试通过自旋的方式获取共享资源,减少线程上下文切换的开销。
RetryQueue
RetryQueue 用于存放发送失败待重试的 ProducerBatch,每个 batch 有一个字段用于标识下次计划发送时间。为了高效地获取超时 batch,内部选用golang语言官方提供的heap包里面的最小堆排序进行实现, 其中插入到RetryQueue的batch是会按照时间进行排序的优先队列,最先超时的 batch 会被优先取出,同时它也是线程安全的。
Mover
Mover 是一个独立的groutine,它会循环地将 LogAccumulator 和 RetryQueue 中的超时 batch 投递到 TaskQueue 中。为了避免空转占用宝贵的 CPU 资源,当 Mover 发现 LogAccumulator 和 RetryQueue 里没有满足发送条件的 batch 时,会在 RetryQueue 的 expiredBatches 方法上等待用户配置的数据最长缓存时间 lingerMs。
IoThreadPool
IoThreadPool包含一个任务队列(TaskQueue),其中TaskQueue是一个任务队列,其使用golang的切片实现,遵循先进先出的规则,该任务队列没有上线,但是因为producer使用的内存大小是有限的,所以该队列不会成为一个无限增长的任务队列。IoThreadPool会启动一个独立groutine 去循环TaskQueue中的任务列表,将获取到的任务调用IoWorker中的sendToServer接口进行对服务端数据的发送。
IoWorker
IoWorker其中提供了sendToServer接口,是真正进行数据发送的组件,每次调用sendToServer接口都会开启一个新的独立的groutine去执行发送任务,其中发送任务失败或成功后执行的回调函数,也会在当前开启的发送任务groutine中执行,不会在新开groutine去执行用户设置的成功或失败回调函数,所以如果用户在失败或成功回调函数中写入会长时间阻塞groutine的操作,这样会影响到producer的IoThreadPool可使用groutine的数目,进而影响到producer整体的吞吐量,所以不建议在成功或失败回调函数中写入会长时间阻塞groutine的操作。
优雅关闭
要实现优雅关闭,需要做到以下几点:
- Close 方法在期望时间内返回时,producer 中的所有线程都应停止,缓存的数据都应得到处理,用户注册的 callback 都应被执行,返回给用户的 future 都应被设置。
- 支持用户设定 close 方法的最长等待时间,超过这个时间不论线程是否停止,缓存的数据是否完全处理,该方法都应立即返回。
- Close 方法支持被调用多次,在多线程环境下也能按预期工作。
- 在 callback 里调用 close 方法是安全的,不会造成程序死锁。
为了达到上述目标,producer 的关闭逻辑设计如下:
- 关闭 LogAccumulator,这时继续往 LogAccumulator 中写数据会抛异常。
- 关闭 RetryQueue,这时继续往 RetryQueue 中投递 batch 会抛异常。
- 关闭 Mover 并等待其完全退出。Mover 检测到关闭信号后会把 LogAccumulator 和 RetryQueue 中剩余的 batch 全部取出并投递到 IOThreadPool 中,不论它们是否满足发送条件。为了防止数据丢失,Mover 会不断从 LogAccumulator 和 RetryQueue 中获取 batch 直到没有其他线程正在写入。
- 关闭 IOThreadPool 并等待已提交的任务全部执行完毕。这时由于 RetryQueue 已经关闭,发送失败的 batch 会被直接投递到失败队列中。
可以看到,这里按照数据流动方向依次关闭队列和线程来达到优雅关闭、安全退出的目的。