Replication是MongoDB一套非常复杂的功能,功能包括数据同步,选举,心跳维护等。涉及到与其他MongoD进程通讯。RPC的封装相对也比较重要,作为这些功能实现的基础。
Replication相关的对象角色都被封装在ReplicationCoordinatorImpl对象中:
repl::ReplicationCoordinatorImpl* replCoord =
new repl::ReplicationCoordinatorImpl(
getGlobalReplSettings(),
new repl::ReplicationCoordinatorExternalStateImpl,
new repl::NetworkInterfaceImpl,
new repl::TopologyCoordinatorImpl(Seconds(repl::maxSyncSourceLagSecs)),
static_cast<int64_t>(curTimeMillis64()));
本篇先从网路部分开始介绍NetworkInterfaceImpl
。
ReplicationExecutor::NetworkInterface
接口,定义了startCommand
cancelCommand
runCallbackWithGlobalExclusiveLock
等方法,抽象Command的处理逻辑。同时还有start wait
等一些线程管理接口,重点关注在command和callback相关的实现。
首先来说明下连接池的管理:
NetworkInterfaceImpl::ConnectionPool::ConnectionInfo
集成DBClientConnection外,还有一个creationDate成员,记录连接创建的时间,后面会根据这个时间来消耗过期的Connection。
NetworkInterfaceImpl::ConnectionPool::ConnectionPtr
保存的并不是Connection指针,而是一个list::iterator
。构造该对象时,只需要指定HostAddress即可,而不是需要真正的Connection对象等。ConnectionPtr会根据HostAddress从Pool中获取(acquireConnection
)相应的迭代器。
NetworkInterfaceImpl::ConnectionPool
核心类,连接池管理,不同与一般的连接池只维护同一个地址的多个连接。此连接池维护的是不同的地址,对应的多个连接,所有的空闲连接都存放在_connections
成员中,使用中的连接存放在_inUseConnections
,两个成员都被_mutex
保护。调用者只要使用两个函数acquireConnection
和releaseConnection
,需要连接时调用acquireConnection
,使用结束后通过releaseConnection
归还。
acquireConnection
作用是获取Connection,如果Pool中没有,则创建一个Connection。首先从刚才说的_connections中根据参数传入的HostAddress查找到相应的ConnectionList。
获取到ConnectionList后,先会去尝试清理掉过期的Connection(参考cleanUpOlderThan_inlock实现)。然后取出ConnectionList中的第一个Connection放入到_inUseConnections
中。
如果没有找到相应HostAddress的连接,或者连接过期被释放,甚至连接已经不可用。则重新构建连接,并且发送鉴权信息,保证连接的合法性,且可用。
cleanUpOlderThan_inlock
这里的处理比较暴力,个人认为并不是非常合理。简单讲就是找到从创建开始30秒的链接去杀掉销毁。如果能修改成只销毁长时间没有被使用过的链接,效果会更好。明明是在长期使用的连接,也因为30秒的问题而被销毁。
Sock::isStillConnected
判断连接是否可用的,采用了socket pool的方式来判断,用非阻塞的方式查看Socket的POLLIN事件,如果Socket中存在数据,或者任何其他的错误事件被触发,则说明连接状态不可用,返回False。
Sock::connect
因为connect timeout是受syncookies影响,timeout时间会非常的久,所以要在创建另一个线程中进行connect操作,原线程wait指定的时间,超时则放弃。这里的实现比较重,需要开辟新的线程,比较好的做法是使用epoll非阻塞的方法,可以参考解决方案*。
releaseConnection
从_inUseConnections
再放回到_connections
中,在List中的位置是头部。
NetworkInterfaceImpl
现在可以回头来继续说明NetworkInterfaceImpl
。
NetworkInterfaceImpl
虽然通过ConnectionPool
管理了Socket,自身则来管理了一个线程池,线程数量最多51,最少也维护了1个线程。command任务都提交到_pending
成员中,工作线程从中获取到Command任务对象来执行,生产者消费者模式。工作线程的最大空闲时间也是30S,如果30S时间内,线程空闲数量小于_peding
数量,则自动销毁资源。
每个提交上来的任务,都有一个cbHandle
,作为Callback的句柄参数。onFinish
任务处理完成,或者被Cancel的话,都会调用。对于Cancel需要说明,只能取消处于pending状态的任务。
_runCommand
有了前面的ConnectionPool
,这里就简单了很多,只需要从中拿到连接对象,发送Command消息即可。如果异常情况出现,比如连接失败,则通过捕获异常来反馈。虽然ConnectionPool
没有对连接数量做出上限控制,但NetworkInterfaceImpl
控制了线程数量,所以连接数基本也是可控的。理论来说最多的连接数量等于地址数量*51,但实际情况远达不到。