与Topic相关的代码主要位于nsqd/nsqd.go
, nsqd/topic.go
中。
Topic的获取
Topic通过GetTopic
函数获取
GetTopic
函数用于获取topic对象,首先先尝试从topicMap
表中获取,如果指定的topic存在,则直接返回topic对象。
当topic不存在时需要新建一个topic,加入到topicMap
中,
如果启用了nsqlookupd则需要从lookupd
中获取该topic的所有channel,在去除#ephemeral
结尾的临时channel后加入到topic中。
其中锁的使用值得学习:在调用完nsqd的变量后转而进行topic操作,这时候程序转而使用topic的小粒度的锁,释放了nsqd全局的大粒度锁,
在保证线程安全的同时减少了效率上的损失。
在创建新的topic后需要向channelUpdateChan
发送消息来更新topic中的channel,而channelUpdateChan
是一个阻塞的go channel,
所以此处使用了select
,并同时监听了exitChan
。如果此时exitChan
收到信号则可以正常退出select
。
如果没有case <-t.exitChan
这句话,
则可能接收channelUpdateChan
的go channel已经退出,但是发送端却还在阻塞中。当然,可以通过退出主go channel来结束程序,
但这样做可能造成部分析构的代码没有得到执行,而且在部分场景下,
只是程序的一个go channel结束运行(在nsqd的这个例子中是topic被删除)而非整个程序退出。
为了避免这个问题,nsqd许多向go channal发送消息的地方都使用了这种机制。
以下是这种机制的一个示例,
可以通过The Go Playground来运行:
运行程序,得到以下运行结果:
Task1和Task2是两个生产者,它们都向workerChan发送消息,其中Task2立即发送,Task1有一定延时,workerChan是一个阻塞的go channel。
同时,有一个go channel发送结束信号(关闭exitChan)。随后开启消费者,接收workerChan的消息,
Task1和Task2的区别是Task2在select中多了一个对exitChan的监听。
从结果可以看出,当exitChan被关闭时,Task2结束对workerChan的阻塞,取消了像worker发送信号,同时结束了自身。
而没有监听exitChan的Task1依然在阻塞,直到被读取后才退出。
示例说明了可以通过对exitChan的使用来结束对阻塞go channel的等待。需要说明的是,在真实场景中,
消费者在发出结束的意图后可能并不会去处理尚未被处理的消息,所以像示例中的Task1是无法正常结束的。
Topic的创建
当GetTopic
未在已存在的topic中找到指定topic时,就会使用NewTopic
函数新建一个Topic
对象。 Topic
和NewTopic
都位于nsqd/topic.go
中。
NewTopic
函数首先创建了一个Topic
结构,然后判断该topic是不是一个临时topic。topic中有个名为backend
的变量,
其类型为Backend
接口。对于临时topic,
消息只储存在内存中,因此backend
变量使用newDummyBackendQueue
函数初始化。该函数生成一个无任何功能的dummyBackendQueue
结构;
对于永久的topic,backend
使用newDiskQueue
函数返回diskQueue
类型赋值,并开启新的goroutine来进行数据的持久化。 dummyBackendQueue
和diskQueue
都实现了Backend
接口,因此,在之后可以使用backend
统一处理。
随后,NewTopic
函数开启一个新的goroutine来执行messagePump
函数,该函数负责消息循环,将进入topic中的消息投递到channel中。
最后,NewTopic
函数执行t.ctx.nsqd.Notify(t)
,该函数在topic和channel创建、停止的时候调用, Notify
函数通过执行PersistMetadata
函数,将topic和channel的信息写到文件中。
在Notify
函数的实现时,首先考虑了数据持久化的时机,如果当前nsqd尚在初始化,则不需要立即持久化数据,因为nsqd在初始化后会进行一次统一的持久化工作,
Notify
在进行数据持久化的时候采用了异步的方式。使得topic和channel能以同步的方式来调用Nofity而不阻塞。在异步运行的过程中,
通过waitGroup
和监听exitChan
的使用保证了结束程序时goroutine能正常退出。
在执行持久化之前,case n.notifyChan <- v:
语句向notifyChan
传递消息,触发lookupLoop
函数(nsqd/lookup.go
中)接收notifyChan
消息的部分,
从而实现向loopupd
注册/取消注册响应的topic或channel。
消息进入topic
客户端通过nsqd的HTTP API或TCP API向特定topic发送消息,nsqd的HTTP或TCP模块通过调用对应topic的PutMessage
或PutMessages
函数,
将消息投递到topic中。PutMessage
或PutMessages
函数都通过topic的私有函数put
进行消息的投递,两个函数的区别仅在PutMessage
只调用一次put
, PutMessages
遍历所有要投递的消息,对每条消息使用put
函数进行投递。
带缓冲的Go channel的特性
put
函数使用了一个带缓冲的go channel的特性:如果case里的go channel阻塞了,那么就会跳过该case语句,执行default分支。即,如果当前memoryMsgChan
还有足够缓冲空间,
则消息被投入memoryMsgChan
,如果当前memoryMsgChan
的缓冲区已满,则将执行default分支,从而将消息保存到backend
中。
对于临时topic,由于backend
不进行任何操作,这就意味着消息在内存的缓存满了之后会被直接丢弃,对于永久的channel,则backend
会将该消息持久化到磁盘的文件中。
put
函数使用了Golang的channel特性,大大简化了实现这个逻辑的代码量,以下通过一个简单的示例看看Golang的带缓冲的channel的这一特性,
示例可以通过The Golang Playground运行:
运行结果如下:
示例程序中有3个go channel,workerChan
和worker2Chan
用于处理消息,exitChan
用于程序的退出。
当消费者go channel启动后,启动一个生产者go channel向workerChan
连续发送3个消息, time.After
模拟了消费者在处理workerChan
的消息时出现的延迟,而workerChan
的缓冲区只有3,
因此当消费者向workerChan
发送第4个消息的时候会被阻塞,从运行结果看,没有消息被投向worker2Chan
,
程序在遇到阻塞时进入了default分支,打印出Channel queue full
。特定场景下合理使用这一特性能够大幅减少程序的复杂度。
put
函数对消息的持久化
以上部分来自函数的default分支,用于将消息持久化到磁盘文件中,过程如下:
- 通过
bufferPoolGet
函数从buffer池中获取一个buffer,bufferPoolGet
及以下bufferPoolPut
函数是对sync.Pool
的简单包装。
两个函数位于nsqd/buffer_pool.go
中。 - 调用
writeMessageToBackend
函数将消息写入磁盘。 - 通过
bufferPoolPut
函数将buffer归还buffer池。 - 调用
SetHealth
函数将writeMessageToBackend
的返回值写入errValue
变量。
该变量衍生出IsHealthy
,GetError
和GetHealth
3个函数,主要用于测试以及从HTTP API获取nsqd的运行情况(是否发生错误)
其中writeMessageToBackend
函数重新初始化缓存,将Message
类型的消息序列化到缓存中,最后将缓存写入backend
Topic消息循环
messagePump
函数负责Topic的消息循环,该函数在创建新的topic时通过waitGroup
在新的goroutine中运行。
messagePump
函数初始化时先获取当前存在的channel数组,设置memoryMsgChan
和backendChan
,随后进入消息循环,
在循环中主要处理四种消息:
- 接收来自
memoryMsgChan
和backendChan
两个go channel进入的消息,并向当前的channal数组中的channel进行投递 - 处理当前topic下channel的更新
- 处理当前topic的暂停和恢复
- 监听当前topic的删除
消息投递
这两个case语句处理进入topic的消息,关于两个go channel的区别会在后续的博客中分析。
从memoryMsgChanbackendChan
读取到的消息是*Message
类型,而从backendChan
读取到的消息是byte
数组的。
因此取出backendChan
的消息后海需要调用decodeMessage
函数对byte
数组进行解码,返回*Message
类型的消息。
二者都保存在msg
变量中。
随后是将消息投到每个channel中,首先先对消息进行复制操作,这里有个优化,对于第一次循环,
直接使用原消息进行发送以减少复制对象的开销,此后的循环将对消息进行复制。对于即时的消息,
直接调用channel的PutMessage
函数进行投递,对于延迟的消息,
调用channel的StartDeferredTimeout
函数进行投递。对于这两个函数的投递细节,后续博文中会详细分析。
Topic下Channel的更新
Channel的更新比较简单,从channelMap
中取出每个channel,构成channel的数组以便后续进行消息的投递。
并且根据当前是否有channel以及该topic是否处于暂停状态来决定memoryMsgChan
和backendChan
是否为空。
Topic的暂停和恢复
这个case既处理topic的暂停也处理topic的恢复,pause
变量决定其究竟是哪一种操作。
Topic的暂停和恢复其实和topic的更新很像,根据是否暂停以及是否有channel来决定是否分配memoryMsgChan
和backendChan
。
messagePump
函数的退出
messagePump
通过监听exitChan
来获知topic是否被删除,当topic的删除时,跳转到函数的最后,输出日志后退出消息循环。
Topic的关闭和删除
Topic关闭和删除的实现都是调用exit
函数,只是传递的参数不同,删除时调用exit(true)
,关闭时调用exit(false)
。 exit
函数进入时通过atomic.CompareAndSwapInt32
函数判断当前是否正在退出,如果不是,则设置退出标记,对于已经在退出的topic,不再重复执行退出函数。
接着对于关闭操作,使用Notify
函数通知lookupd以便其他nsqd获知该消息。
随后,exit
函数调用close(t.exitChan)
和t.waitGroup.Wait()
通知其他正在运行goroutine当前topic已经停止,并等待waitGroup
中的goroutine结束运行。
最后,对于删除和关闭两种操作,执行不同的逻辑来完成最后的清理工作:
对于删除操作,需要清空
channelMap
并删除所有channel,然后删除内存和磁盘中所有未投递的消息。最后关闭backend
管理的的磁盘文件。对于关闭操作,不清空
channelMap
,只是关闭所有的channel,使用flush
函数将所有memoryMsgChan
中未投递的消息用writeMessageToBackend
保存到磁盘中。最后关闭backend
管理的的磁盘文件。
flush
函数也使用到了default分支来检测是否已经处理完全部消息。
由于此时已经没有生产者向memoryMsgChan
提供消息,因此如果出现阻塞就表示消息已经处理完毕。
在删除topic时用到的Empty
函数跟flush
处理逻辑类似,只不过Empty
只释放memoryMsgChan
消息,而不保存它们。
其他函数
Depth
函数
Depth
函数
Depth
函数用于获取当前topic尚未投递的消息数,是memoryMsgChan
缓冲区的长度加上backend
里消息的个数。
Pause
和UnPause
函数
对于很多相似的处理逻辑,nsqd在对外使用不同的函数,但在内部实现上通常把它们合并为一个函数来处理,只是传递的参数不同而已,
比如前面提到的Close
和Delete
。Pause
和UnPause
同样也使用这种方式,通过传递不同的参数调用doPause
函数来执行不同操作。 doPause
设置paused
标志并向pauseChan
发送消息,随后由messagePump
在消息循环中暂停topic。
AggregateChannelE2eProcessingLatency
函数
此函数用于性能统计,在nsqd/statd.go
中调用,客户端可以通过HTTP的/stats API看到统计结果。具体细节将在后续博文分析。
与channel相关的函数
GetChannel
, getOrCreateChannel
,GetExistingChannel
, DeleteExistingChannel
这些函数是与channel相关的函数,将在后续的博文中分析。