生产上线prometheus后遇到了一个监控覆盖度的问题,prometheus 是通过server端配置里的targets来获取监控对象的,targets的获取又是依赖discovery模块进行获取的,prometheus默认支持很多的discovery的模块,以2.19版本为例,他就支持了如下sd的方式:
"github.com/prometheus/prometheus/discovery/azure" "github.com/prometheus/prometheus/discovery/consul" "github.com/prometheus/prometheus/discovery/dns" "github.com/prometheus/prometheus/discovery/ec2" "github.com/prometheus/prometheus/discovery/file" "github.com/prometheus/prometheus/discovery/gce" "github.com/prometheus/prometheus/discovery/kubernetes" "github.com/prometheus/prometheus/discovery/marathon" "github.com/prometheus/prometheus/discovery/openstack" "github.com/prometheus/prometheus/discovery/triton" "github.com/prometheus/prometheus/discovery/zookeeper"
但即使这么多,也无法涵盖所有服务发现的场景,比如产线有2W+的服务器,如何实时覆盖这些服务器到监控里呢,利用现有的discovery方案当然也可以实现,比如file,可以写个程序定时从cmdb里调用产线服务器IP清单,然后转换成target的结构体输出到file 文件里,或者改造客户端node代码,在node启动的时候添加一个服务注册,启动后自动加到consul里,然后discovery 通过consul方式进行读取。
当然,如果生产上的需求着实复杂,比如除了主机的OS监控,还需要服务的监控,而因为服务的种类不同,监控项也不同,所以需要配置很多的job,每个job又有不同的targets,在这种场景下,如何灵活的获取targets呢,我们想到的方法是改造prometheus的discovery代码,让其兼容我们的cmdb,这样,prometheus只需要根据cmdb里已经配置好的监控分组信息,自动更新不同job的targets。
基于这个目的,我们需要先熟悉下discovery是如何工作的,还是从main 看起。
main会启一个协程池,然后里面会启动prometheus的各类模块
// 找到对应的discoveryManagerScrape对象 discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape")) ...... ...... scrapeManager.ApplyConfig, func(cfg *config.Config) error { c := make(map[string]sd_config.ServiceDiscoveryConfig) for _, v := range cfg.ScrapeConfigs { c[v.JobName] = v.ServiceDiscoveryConfig } return discoveryManagerScrape.ApplyConfig(c) }, ...... ...... var g run.Group ...... ...... { // Scrape discovery manager. g.Add( func() error { err := discoveryManagerScrape.Run() level.Info(logger).Log("msg", "Scrape discovery manager stopped") return err }, func(err error) { level.Info(logger).Log("msg", "Stopping scrape discovery manager...") cancelScrape() }, ) }
先不看细节,先关注流程,discovery服务会先初始化一个discoveryManagerScrape,该对象是由discovery.NewManager生成的,然后根据配置文件对discoveryManagerScrape进行初始化操作, 然后启动一个协程池,在池子里启动discoveryManagerScrape.Run进行服务发现。
先看看这个Run 干了点啥。
func (m *Manager) Run() error { go m.sender() for range m.ctx.Done() { m.cancelDiscoverers() return m.ctx.Err() } return nil } ...... ...... func (m *Manager) sender() { ticker := time.NewTicker(m.updatert) defer ticker.Stop() for { select { case <-m.ctx.Done(): return case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. select { case <-m.triggerSend: sentUpdates.WithLabelValues(m.name).Inc() select { case m.syncCh <- m.allGroups(): default: delayedUpdates.WithLabelValues(m.name).Inc() level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") select { case m.triggerSend <- struct{}{}: default: } } default: } } } } ...... ...... func (m *Manager) allGroups() map[string][]*targetgroup.Group { m.mtx.RLock() defer m.mtx.RUnlock() tSets := map[string][]*targetgroup.Group{} for pkey, tsets := range m.targets { var n int for _, tg := range tsets { // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' // to signal that it needs to stop all scrape loops for this target set. tSets[pkey.setName] = append(tSets[pkey.setName], tg) n += len(tg.Targets) } discoveredTargets.WithLabelValues(m.name, pkey.setName).Set(float64(n)) } return tSets }
main 里的协程启动了Manager的Run()方法,Run 里面又启动了一个协程启动 sender 方法, sender 通过allGroups方法将targets更新到syncCh的chain 里,这个chain的作用是将target信息传给数据采集的模块,即scrapeManager,可以看到main 里面的数据采集模块的启动代码,参数里就有SyncCh()
// Scrape manager. g.Add( func() error { // When the scrape manager receives a new targets list // it needs to read a valid config for each job. // It depends on the config being in sync with the discovery manager so // we wait until the config is fully loaded. <-reloadReady.C err := scrapeManager.Run(discoveryManagerScrape.SyncCh()) level.Info(logger).Log("msg", "Scrape manager stopped") return err },
也就是说, Run在启动后干的事情就是再启动一个协程,该协程干的事情就是结构体Manager 里面的Targets信息,实时的吐到SyncCh()的chain 里,然后数据抓取模块Scrape manager通过chain 收到了Target信息,再对对应的Target进行数据抓取。
知道了Run 干了什么,问题来了,实时获取的Target的信息是哪里来的,这才是Discover该干的事情,回头看下Manager的结构体
type Manager struct { logger log.Logger name string mtx sync.RWMutex ctx context.Context discoverCancel []context.CancelFunc // Some Discoverers(eg. k8s) send only the updates for a given target group // so we use map[tg.Source]*targetgroup.Group to know which group to update. targets map[poolKey]map[string]*targetgroup.Group // providers keeps track of SD providers. providers []*provider // The sync channel sends the updates as a map where the key is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group // How long to wait before sending updates to the channel. The variable // should only be modified in unit tests. updatert time.Duration // The triggerSend channel signals to the manager that new updates have been received from providers. triggerSend chan struct{} }
关注结构体里的几个元素,syncCh 这个chan是干啥的上面已经说了,targets是发现对象的具体内容,比如IP, providers是服务发现的具体实现,比如上面的kubernetes,dns,file
回到最开始的main 函数,他首先会通过NewManager生成manager这个对象:
// NewManager is the Discovery Manager constructor. func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager)) *Manager { if logger == nil { logger = log.NewNopLogger() } mgr := &Manager{ logger: logger, syncCh: make(chan map[string][]*targetgroup.Group), targets: make(map[poolKey]map[string]*targetgroup.Group), discoverCancel: []context.CancelFunc{}, ctx: ctx, updatert: 5 * time.Second, triggerSend: make(chan struct{}, 1), } for _, option := range options { option(mgr) } return mgr } }
然后main 函数里会对Manager 这个对象进行初始化,初始化的动作其实就是根据配置文件对Manager里的provider进行注册
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error { m.mtx.Lock() defer m.mtx.Unlock() for pk := range m.targets { if _, ok := cfg[pk.setName]; !ok { discoveredTargets.DeleteLabelValues(m.name, pk.setName) } } m.cancelDiscoverers() m.targets = make(map[poolKey]map[string]*targetgroup.Group) m.providers = nil m.discoverCancel = nil failedCount := 0 for name, scfg := range cfg { failedCount += m.registerProviders(scfg, name) discoveredTargets.WithLabelValues(m.name, name).Set(0) } failedConfigs.WithLabelValues(m.name).Set(float64(failedCount)) for _, prov := range m.providers { m.startProvider(m.ctx, prov) } return nil }
在注册的时候会先清空所有的targetgroup信息,然后调用registerProviders 和startProvider 这2个方法分别实现了不同job的provider的注册以及不同job的provider的启动过程。
先看下registerProviders:
// registerProviders returns a number of failed SD config. func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string) int { var ( failedCount int added bool ) add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) { t := reflect.TypeOf(cfg).String() for _, p := range m.providers { if reflect.DeepEqual(cfg, p.config) { p.subs = append(p.subs, setName) added = true return } } d, err := newDiscoverer() if err != nil { level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", t) failedCount++ return } provider := provider{ name: fmt.Sprintf("%s/%d", t, len(m.providers)), d: d, config: cfg, subs: []string{setName}, } m.providers = append(m.providers, &provider) added = true } for _, c := range cfg.DNSSDConfigs { add(c, func() (Discoverer, error) { return dns.NewDiscovery(*c, log.With(m.logger, "discovery", "dns")), nil }) } ...... ...... for _, c := range cfg.TritonSDConfigs { add(c, func() (Discoverer, error) { return triton.New(log.With(m.logger, "discovery", "triton"), c) }) } if len(cfg.StaticConfigs) > 0 { add(setName, func() (Discoverer, error) { return &StaticProvider{TargetGroups: cfg.StaticConfigs}, nil }) } if !added { // Add an empty target group to force the refresh of the corresponding // scrape pool and to notify the receiver that this target set has no // current targets. // It can happen because the combined set of SD configurations is empty // or because we fail to instantiate all the SD configurations. add(setName, func() (Discoverer, error) { return &StaticProvider{TargetGroups: []*targetgroup.Group{{}}}, nil }) } return failedCount }
找到类似return dns.NewDiscovery(*c, log.With(m.logger, "discovery", "dns")), nil的代码块,猜测这里就是初始化不同provider的地方了,看下Discoverer这个interface
type Discoverer interface { Run(ctx context.Context, up chan<- []*targetgroup.Group) }
再看下provider 这个struct
type provider struct { name string d Discoverer subs []string config interface{} }
看到这个Run 感觉一下子恍然大悟了,前面说到prometheus 里面有各种各样的provider,有file的,有kubernetes的,有dns的,还有对公有云的,这些不一样的provider如何获取target信息的方式实现起来各不相同,但这些prometheus可不关心,所以他抽象出一个Discorverer的接口,让各个Provider各自去实现这个接口,从而让各种服务发现的方式可以实现类似于插件的效果插到discover的manager 里。
理解了这里,最后看下startProvider,里面会启动2个协程序go p.d.Run(ctx, updates) 和 go m.updater(ctx, p, updates)
第一个协程启动的是不同provider,让不同的provider 分别去实现Discoverer这个interface
第二个协程是用于更新Provider的targetgroup.Group的消费者
func (m *Manager) startProvider(ctx context.Context, p *provider) { level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs)) ctx, cancel := context.WithCancel(ctx) updates := make(chan []*targetgroup.Group) m.discoverCancel = append(m.discoverCancel, cancel) go p.d.Run(ctx, updates) go m.updater(ctx, p, updates) }
最后画个图,以file 这个provider 为例,大致描述下discover的流程
了解了这个流程,接下来就可以自己开发自己的provider 了,等代码写完再来记录一下。