cAdvisor是一款强大的 Docker Container 监控工具,方便容器用户,对运行中的容器进行资源使用和性能分析。用于收集、聚合、处理和导出运行中容器的信息。cAdvisor提供了对Docker容器的原生支持,并且应该支持任何其他容器类型。
Kubelet内置了对cAdvisor的支持,用户可以直接通过Kubelet组件获取给节点上容器相关监控指标。
本系列文章cAdvisor代码,以v0.37.5代码为例。
cAdvisor主函数分析
以下是main函数代码,会对代码进行简单注解,并对代码进行一定程度上的精简,其代码路径为:/cadvisor/cmd/cadvisor.go
根据以下代码可以总结cAdvisor主要完成了以下几个工作:
-
提供API给外部使用,包括一般API接口和prometheus接口
-
可实现第三方数据存储,支持 bigquery、es、influxdb、kafka、redis、statsd、stdout
-
收集数据包括 container、process、machine、Go runtime
func main() {
klog.InitFlags(nil)
defer klog.Flush()
flag.Parse()
if *versionFlag {
fmt.Printf("cAdvisor version %s (%s)\n", version.Info["version"], version.Info["revision"])
os.Exit(0)
}
// 拿到所有需要收集的metrics类型,即从全量的metrics类型中,排除掉flag.disable_metrics,剩余的metrics集
// 返回的值大概为container.MetricSet{
// CpuUsageMetrics: struct{}{}, //cpu
// ProcessSchedulerMetrics: struct{}{}, //sched
// PerCpuUsageMetrics: struct{}{}, //precpu
// ....}
includedMetrics := toIncludedMetrics(ignoreMetrics.MetricSet)
// 利用cpu个数或是flag.max_procs,设置最大可执行的cpu个数
setMaxProcs()
//1. 初始化本地内存
//2. 初始化存储介质,可初始化多个,支持:bigquery,es,influxdb,kafka,redis,statsd,stdout【用flag.storage_driver】
//3. 定时将数据存入存储介质中【flag.storage_duration】 ??
memoryStorage, err := NewMemoryStorage()
if err != nil {
klog.Fatalf("Failed to initialize storage driver: %s", err)
}
// 系统fs对象
sysFs := sysfs.NewRealSysFs()
// 利用证书,创建http 的 client
collectorHttpClient := createCollectorHttpClient(*collectorCert, *collectorKey)
// 创建resourceManager
resourceManager, err := manager.New(memoryStorage, sysFs, housekeepingConfig, includedMetrics, &collectorHttpClient, strings.Split(*rawCgroupPrefixWhiteList, ","), *perfEvents)
if err != nil {
klog.Fatalf("Failed to create a manager: %s", err)
}
mux := http.NewServeMux()
if *enableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
}
// Register all HTTP handlers.
err = cadvisorhttp.RegisterHandlers(mux, resourceManager, *httpAuthFile, *httpAuthRealm, *httpDigestFile, *httpDigestRealm, *urlBasePrefix)
if err != nil {
klog.Fatalf("Failed to register HTTP handlers: %v", err)
}
containerLabelFunc := metrics.DefaultContainerLabels
if !*storeContainerLabels {
whitelistedLabels := strings.Split(*whitelistedContainerLabels, ",")
containerLabelFunc = metrics.BaseContainerLabels(whitelistedLabels)
}
// Register Prometheus collector to gather information about containers, Go runtime, processes, and machine
cadvisorhttp.RegisterPrometheusHandler(mux, resourceManager, *prometheusEndpoint, containerLabelFunc, includedMetrics)
// Start the manager.
if err := resourceManager.Start(); err != nil {
klog.Fatalf("Failed to start manager: %v", err)
}
// Install signal handler.
installSignalHandler(resourceManager)
klog.V(1).Infof("Starting cAdvisor version: %s-%s on port %d", version.Info["version"], version.Info["revision"], *argPort)
rootMux := http.NewServeMux()
rootMux.Handle(*urlBasePrefix+"/", http.StripPrefix(*urlBasePrefix, mux))
addr := fmt.Sprintf("%s:%d", *argIp, *argPort)
klog.Fatal(http.ListenAndServe(addr, rootMux))
}
其中resourceManager类型是manager,粗略浏览下manager结构的字段以及相关功能
type manager struct {
// 当前受到监控的容器存在一个map中 containerData结构中包括了对容器的各种具体操作方式和相关信息
containers map[namespacedContainerName]*containerData
// 对map中数据存取时采用的Lock
containersLock sync.RWMutex
// 缓存在内存中的数据 主要是容器的相关信息
memoryCache *memory.InMemoryCache
// host上的实际文件系统的相关信息
fsInfo fs.FsInfo
// 系统fs对象,里面有一些查询系统文件的方法
sysFs sysfs.SysFs
machineMu sync.RWMutex // protects machineInfo
// machine的相关信息 cpu memory network system信息等等
machineInfo info.MachineInfo
// 用于存放退出信号的channel manager关闭的时候会给其中的channel发送退出信号
quitChannels []chan error
//cadvisor本身所运行的那个容器(如果cadvisor运行在容器中)
cadvisorContainer string
// 是否在hostnamespace中?
inHostNamespace bool
// 对event相关操作进行的封装
eventHandler events.EventManager
// manager的启动时间
startupTime time.Time
// 在内存中保留数据的时间 也就是下次开始搜集容器相关信息并且更新内存信息的时间
maxHousekeepingInterval time.Duration
// 是否允许动态设置dynamic housekeeping
allowDynamicHousekeeping bool
includedMetrics container.MetricSet
containerWatchers []watcher.ContainerWatcher
eventsChannel chan watcher.ContainerEvent
collectorHTTPClient *http.Client
nvidiaManager stats.Manager
perfManager stats.Manager
resctrlManager stats.Manager
// List of raw container cgroup path prefix whitelist.
rawContainerCgroupPathPrefixWhiteList []string
}
cAdvisor数据采集分析
cAdvisor的数据采集分为两个部分machineInfo和containerInfo。以下就详细介绍这两部分数据采集的过程。对数据采集需要用到resourceManager,这是对数据采集的抽象,其结构体内容的具体介绍见。其数据采集开始代码是 /cmd/cadvisor.go -> main.go
中的代码:
resourceManager, err := manager.New(memoryStorage, sysFs, housekeepingConfig, includedMetrics, &collectorHttpClient, strings.Split(*rawCgroupPrefixWhiteList, ","), *perfEvents)
if err != nil {
klog.Fatalf("Failed to create a manager: %s", err)
}
// Start the manager.
if err := resourceManager.Start(); err != nil {
klog.Fatalf("Failed to start manager: %v", err)
}
数据采集结构图
machineInfo
machineInfo的数据采集具体代码,主要是 /machine/info.go -> Info()
,在new manager
的时候会去调用一次这个方法,主要是读取系统文件(具体文件见上面的“整体结构图”),将数据放入到m.MachineInfo中,后续在Start方法中,起一个协程,定时调用该方法,更新本地cache。相关代码地址如下:
/machine/info.go
/machine/machine.go
/machine/operatingsystem_unix.go
/machine/operatingsystem_windows.go
containerInfo
整体的containerInfo的数据采集,由 /manager/manager.go -> Start() 开始,起整体流程图如下:
整体的流程的可以概括为两各部分:
- 利用inotify去watch cgroupPath,监控该目录下的变更,拿到该目录下的增删改查事件,也就知道container的变更,从而动态更新cache中的数据
- 定时check,cache中的m.containers和主动去拿获取目前存在的container,对整体做一个diff,从而更新cache中的数据
创建Container
其代码路径为 /manager/manager.go -> CreateContainer , 具体代码如下,详细解析可以看代码中的注释(代码有做一些删减)。
// Create a container.
func (m *manager) createContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
m.containersLock.Lock()
defer m.containersLock.Unlock()
return m.createContainerLocked(containerName, watchSource)
}
func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error {
namespacedName := namespacedContainerName{
Name: containerName,
}
// 查看该container是否以及存在,如果已存在,则直接return
if _, ok := m.containers[namespacedName]; ok {
return nil
}
// for (factories) 判断是否能创建handler,如果可以则创建handler。
// 该handler实现了 ContainerHandler的interface,里面有GetSpec()、GetStats()、ListContainers等方法
handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.inHostNamespace)
if err != nil {
return err
}
if !accept {
// ignoring this container.
klog.V(4).Infof("ignoring container %q", containerName)
return nil
}
logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
// 创建 containerData struct{}结构体的对象
cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping, clock.RealClock{})
if err != nil {
return err
}
......
// 将该container及其所有的aliases,放入到m.containers中
m.containers[namespacedName] = cont
for _, alias := range cont.info.Aliases {
m.containers[namespacedContainerName{
Namespace: cont.info.Namespace,
Name: alias,
}] = cont
}
klog.V(3).Infof("Added container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace)
......
// 构建event,找到到合适的m.eventHandler.watchers的*[]watchers,放入到*[]watchers的EventChannel.channel中
newEvent := &info.Event{
ContainerName: contRef.Name,
Timestamp: contSpec.CreationTime,
EventType: info.EventContainerCreation,
}
err = m.eventHandler.AddEvent(newEvent)
if err != nil {
return err
}
// Start the container's housekeeping.
// 开启一个housekeeping的协程,定时调用updateStats(),即更新cont的数据
return cont.Start()
}
其中 m.eventHandler.AddEvent(newEvent) ,其逻辑是找到到合适的 m.eventHandler.watchers 的[]watchers,再将newEvent分别放入到[]watchers中,其中根据条件匹配到合适的[]watchers逻辑如下:
-
watcher.request.SndTime< newEvent.timestamp< watcher.request.EndTime
-
newEvent.EventType在watcher.request.EventType中有
-
newEvent.ContainerName的前缀是watcher.request.ContainerName
检测子容器
其代码路径为 /manager/manager.go -> detectSubcontainers() , 主要是拿到containerName=“/”下的所有container 和 m.containers做diff,获取新增的容器added 和 已删除的容器removed
- Added:对于added的容器调用m.CreateContainer()(具体可参考:4.1 创建container)
- Removed:对于removed的容器调用m.destroyContainer(),将该容器及其aliases在cache中的记录全部删除掉
其diff具体逻辑如下图:
Watch
其代码路径为 /manager/manager.go -> watchForNewContainers(quit chan error)
用的是 k8s.io/utils/inotify 中的watch功能,即watch一个目录,从而拿到该目录下的所有变更。所以这里利用的是inotify来watch cgroupPath,从而watch到container的变更
-
调用m.containerWatchers中watch的start(),watch cgroupPaths中的变化,获取该目录变更event,并将得到的event,放入条件匹配的watch的EventChannel.channel中
-
调用 detectSubContainers(“/”) (具体可参考:4.2 检测子容器)
-
go func{}处理以上的到的event,对于add事件调用 m.CreateContainer() ,对于delete事件调用 m.destroyContainer() ,收到quit信号,则退出协程
全局更新
其代码路径为 /manager/manager.go -> globalHousekeeping(quit chan error) ,主要是定时调用m.detectSubcontainers("/"),具体逻辑可参考检测子容器
。间隔时间:globalHousekeepingInterval
func (m *manager) globalHousekeeping(quit chan error) {
// longHousekeeping := min(100ms,*globalHousekeepingInterval / 2)
longHousekeeping := 100 * time.Millisecond
if *globalHousekeepingInterval/2 < longHousekeeping {
longHousekeeping = *globalHousekeepingInterval / 2
}
// 定时,间隔时间 *globalHousekeepingInterval
ticker := time.NewTicker(*globalHousekeepingInterval)
for {
select {
case t := <-ticker.C:
start := time.Now()
// Check for new containers.
err := m.detectSubcontainers("/")
if err != nil {
klog.Errorf("Failed to detect containers: %s", err)
}
// housekeeping 耗时超过longHousekeeping,则打印一条日志
duration := time.Since(start)
if duration >= longHousekeeping {
klog.V(3).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration)
}
case <-quit:
// Quit if asked to do so.
quit <- nil
klog.Infof("Exiting global housekeeping thread")
return
}
}
}
cAdvisor数据存储分析
cAdvisor不仅会在本地存储,用于prometheus拉取,而且还支持将数据存入第三方存储介质,用于数据的持久化,其逻辑相对简单,但是却很重要。
存储核心代码
cadvisor/cmd/storagedriver.go
// 主要用于返回,各第三方init时放入map中的client,以及和storage相关的flag
storage/*
// 主要是一些创建storage的client,AddStats,Close方法
cadvisor/cmd/storage/bigquery/*
cadvisor/cmd/storage/elasticsearch/*
cadvisor/cmd/storage/influxdb/*
cadvisor/cmd/storage/kafka/*
cadvisor/cmd/storage/redis/*
cadvisor/cmd/storage/statsd/*
cadvisor/cmd/storage/stdout/*
// 本地cache相关操作
utils/timed_store.go
代码入口
在真正执行add之前,需要初始化存储对象。数据存储最重要的结构体是InMamoryCache,其具体结构如下,具体逻辑看注释:
type InMemoryCache struct {
// 读写锁
lock sync.RWMutex
// container本地cache
containerCacheMap map[string]*containerCache
// 最大存活时间,这个在下面的存储过程中会用到
maxAge time.Duration
// 不同第三方存储实现的interface
backend []storage.StorageDriver
}
初始调用在main函数中的
func main() {
....
memoryStorage, err := NewMemoryStorage()
if err != nil {
klog.Fatalf("Failed to initialize storage driver: %s", err)
}
....
}
其中NewMemoryStorage() 函数在cmd/storagedriver.go中,具体代码如下:
// NewMemoryStorage creates a memory storage with an optional backend storage option.
func NewMemoryStorage() (*memory.InMemoryCache, error) {
backendStorages := []storage.StorageDriver{}
// storageDriver: flag.storage_driver启动时输入,多个存储介质用逗号分割(默认为空)
for _, driver := range strings.Split(*storageDriver, ",") {
if driver == "" {
continue
}
// 返回的是各第三方存储的StorageDriver,以elasticsearch为例,就是
// /cmd/internal/storage/elasticsearch/elasticsearch.go -> func new() (storage.StorageDriver, error)
storage, err := storage.New(driver)
if err != nil {
return nil, err
}
backendStorages = append(backendStorages, storage)
klog.V(1).Infof("Using backend storage type %q", driver)
}
klog.V(1).Infof("Caching stats in memory for %v", *storageDuration)
// *InMemoryCache,其中maxAge就是flag.storage_duration启动输入的值(默认2m)
return memory.New(*storageDuration, backendStorages), nil
}
数据存储过程
真正数据的存储过程分为两个部分:本地存储和第三方介质存储
-
本地存储:
-
- InMemoryCache.containerCacheMap是一个map,其具体结构为map[string]*ContainerCache,其key是container的name,value是ContainerCache,先判断该map中是否有containerName的数据,如果没有则新增
- 对containerName相对应的ContainerCache插入数据,插入数据的步骤分三步:
-
-
- 将数据根据timestamp插入到相应位置
- 将TimeStore.buffer中timestamp < 刚插入数据的timestamp - age 的数据remove掉
-
-
-
- 查看buffer中数据个数 > maxItems,将timestamp小的数据remove掉
-
-
第三方介质存储:for bankend中的方法,调用各介质的AddStats方法,将数据存入
具体调用过程可参考以下图:
若有收获,就点个赞吧