Grafana 是开源的数据可视化面板,和Protometheus是绝配,常用来做系统监控,将采集的数据进行可视化,里面有很多可视化面板模板,开箱即用。该项目是前后端不分离项目,会采用部分模板渲染,后端采用go,前端采用typescript,如果是新手,可以多看看源码学学做项目的思想。
Grafana 官网地址:Grafana: The open observability platform | Grafana Labs
Grafana启动
grafana\pkg\cmd\grafana-server\main.go+113
server := NewServer(*configFile, *homePath, *pidFile) //创建server
go listenToSystemSignals(server)//监听退出信号
err := server.Run() //启动服务
server.Run()
server结构
grafana\pkg\cmd\grafana-server\server.go
// Server is responsible for managing the lifecycle of services.
type Server struct {
context context.Context
shutdownFn context.CancelFunc
childRoutines *errgroup.Group //还记得前面讲过的errgroup吗
log log.Logger
cfg *setting.Cfg
shutdownReason string
shutdownInProgress bool
configFile string
homePath string
pidFile string
RouteRegister routing.RouteRegister `inject:""` //路由注册器,grafana进行了包装,写了一个自己的路由器
HTTPServer *api.HTTPServer `inject:""`
}
来看看run结构,加载配置,初始化登录,最重要的是将registry.GetServices()里面的服务全部注册进来,然后注入到相应的结构体里面的字段。比如server初始化时没有初始化HTTPServer和RouteRegister这两个字段。那么它们从哪里来呢?
// Run initializes and starts services. This will block until all services have
// exited. To initiate shutdown, call the Shutdown method in another goroutine.
func (s *Server) Run() (err error) {
//加载配置文件
s.loadConfiguration()
//写pid文件
s.writePIDFile()
//初始化日志文件
login.Init()
social.NewOAuthService()
services := registry.GetServices()
if err = s.buildServiceGraph(services); err != nil { //依赖注入
return
}
// Initialize services.
for _, service := range services {
if registry.IsDisabled(service.Instance) {
continue
}
s.log.Debug("Initializing " + service.Name)
if err := service.Instance.Init(); err != nil {
return errutil.Wrapf(err, "Service init failed")
}
}
// Start background services.
for _, svc := range services {
service, ok := svc.Instance.(registry.BackgroundService) //实现了RUN(context)方法就是后台服务,如果只实现了Init(),那么在上面就直接执行了
if !ok {
continue
}
if registry.IsDisabled(svc.Instance) {
continue
}
// Variable is needed for accessing loop variable in callback
descriptor := svc
s.childRoutines.Go(func() error { //调用errgroup 执行run
// Don't start new services when server is shutting down.
if s.shutdownInProgress {
return nil
}
err := service.Run(s.context)
// Mark that we are in shutdown mode
// So no more services are started
s.shutdownInProgress = true
if err != nil {
if err != context.Canceled {
// Server has crashed.
s.log.Error("Stopped "+descriptor.Name, "reason", err)
} else {
s.log.Debug("Stopped "+descriptor.Name, "reason", err)
}
return err
}
return nil
})
}
defer func() {
s.log.Debug("Waiting on services...") //等待执行
if waitErr := s.childRoutines.Wait(); waitErr != nil && !xerrors.Is(waitErr, context.Canceled) {
s.log.Error("A service failed", "err", waitErr)
if err == nil {
err = waitErr
}
}
}()
s.notifySystemd("READY=1")
return
}
答案就在依赖注入,来看看是怎么依赖注入的
// buildServiceGraph builds a graph of services and their dependencies.
func (s *Server) buildServiceGraph(services []*registry.Descriptor) error {
// Specify service dependencies.
objs := []interface{}{
bus.GetBus(),//寻找数据库model层的一个总线bus
s.cfg, //配置文件
routing.NewRouteRegister(middleware.RequestMetrics, middleware.RequestTracing),
localcache.New(5*time.Minute, 10*time.Minute),//缓存
s,//服务器实例
}
for _, service := range services { //将注册的实例进行添加,
objs = append(objs, service.Instance)
}
var serviceGraph inject.Graph
// Provide services and their dependencies to the graph.
for _, obj := range objs { //将对象添加进依赖注入
if err := serviceGraph.Provide(&inject.Object{Value: obj}); err != nil {
return errutil.Wrapf(err, "Failed to provide object to the graph")
}
}
// Resolve services and their dependencies.
if err := serviceGraph.Populate(); err != nil {//进行依赖注入
return errutil.Wrapf(err, "Failed to populate service dependency")
}
return nil
}
举个例子服务都是怎么注册的
grafana\pkg\api\http_server.go+40
//注册HTTPServer
func init() {
registry.Register(®istry.Descriptor{
Name: "HTTPServer",
Instance: &HTTPServer{},
InitPriority: registry.High,
})
}
serviceGraph.Provide( )将对象全部放进去,然后调用Populate 进行依赖注入,后面我会将专门写篇文章介绍这个"github.com/facebookgo/inject"包,依赖注册神器
HTTPServer
HTTPServer就是http后台服务器的实例了,我将不用的都删了,其中macaron就是Macaron框架的示例,下面看看怎么被初始化的
type HTTPServer struct {
log log.Logger
macaron *macaron.Macaron
httpSrv *http.Server
RouteRegister routing.RouteRegister `inject:""`
Bus bus.Bus `inject:""`
Cfg *setting.Cfg `inject:""`
}
在服务器启动时候就会调用init方法,被初始化
//初始化
func (hs *HTTPServer) Init() error {
hs.log = log.New("http.server")
hs.streamManager = live.NewStreamManager()
hs.macaron = hs.newMacaron()
hs.registerRoutes()
return nil
}
//创建Macaron的实例
func (hs *HTTPServer) newMacaron() *macaron.Macaron {
macaron.Env = setting.Env
m := macaron.New()
// automatically set HEAD for every GET
m.SetAutoHead(true)
return m
}
添加中间件
func (hs *HTTPServer) applyRoutes() {
// start with middlewares & static routes
hs.addMiddlewaresAndStaticRoutes()
// then add view routes & api routes
hs.RouteRegister.Register(hs.macaron)
// then custom app proxy routes
hs.initAppPluginRoutes(hs.macaron)
// lastly not found route
hs.macaron.NotFound(hs.NotFoundHandler)
}
首先看看添加中间件,去除一些静态路由和其他代码,留下了几个比较重要的中间件,读者感兴趣可以自己去看看源码。
func (hs *HTTPServer) addMiddlewaresAndStaticRoutes() {
m := hs.macaron
m.Use(middleware.Logger()) //日志中间件
m.Use(middleware.Recovery())//错误恢复
m.Use(middleware.AddDefaultResponseHeaders()) //添加默认响应头
m.Use(hs.healthHandler)//健康检查
m.Use(hs.metricsEndpoint)//监控指标采集
m.Use(middleware.GetContextHandler(//对context 的二次封装
hs.AuthTokenService,
hs.RemoteCacheService,
hs.RenderService,
))
}
来看看如何对context的二次封装吧,该方法返回是个回调函数,macaron.Handler也是回调函数,所以这个是一个不带next的中间件,会按照中间件顺序执行,在执行过程中进行用户登录验证
grafana\pkg\middleware\middleware.go +40
func GetContextHandler(
ats models.UserTokenService,
remoteCache *remotecache.RemoteCache,
renderService rendering.Service,
) macaron.Handler {
return func(c *macaron.Context) {
ctx := &models.ReqContext{ //创建自定义ReqContext,注意是继承了macaron.Context
Context: c,
SignedInUser: &models.SignedInUser{},//创建用户数据
IsSignedIn: false,//是否登录,可能是oauth,ldapp,账号密码,或者jwt,但只有一个登录成功就会为true,如果没有,将在后面的一个中间件将他拦截下来,响应相关认证错误
AllowAnonymous: false,//是否匿名
SkipCache: false,
Logger: log.New("context"),//日志实例
}
orgId := int64(0)
orgIdHeader := ctx.Req.Header.Get("X-Grafana-Org-Id")
if orgIdHeader != "" {
orgId, _ = strconv.ParseInt(orgIdHeader, 10, 64)
}
// the order in which these are tested are important
// look for api key in Authorization header first
// then init session and look for userId in session
// then look for api key in session (special case for render calls via api)
// then test if anonymous access is enabled
//下面是一些列认证,只要一个为true,就会退出
switch {
case initContextWithRenderAuth(ctx, renderService):
case initContextWithApiKey(ctx):
case initContextWithBasicAuth(ctx, orgId):
case initContextWithAuthProxy(remoteCache, ctx, orgId):
case initContextWithToken(ats, ctx, orgId):
case initContextWithAnonymousUser(ctx):
}
ctx.Logger = log.New("context", "userId", ctx.UserId, "orgId", ctx.OrgId, "uname", ctx.Login)
ctx.Data["ctx"] = ctx
c.Map(ctx)
// update last seen every 5min
if ctx.ShouldUpdateLastSeenAt() { //更新token日志
ctx.Logger.Debug("Updating last user_seen_at", "user_id", ctx.UserId)
if err := bus.Dispatch(&models.UpdateUserLastSeenAtCommand{UserId: ctx.UserId}); err != nil {
ctx.Logger.Error("Failed to update last_seen_at", "error", err)
}
}
}
}
-
步骤1:创建自定义ReqContext
-
步骤2:进行一系列认证
来随便看看一个认证
func initContextWithBasicAuth(ctx *models.ReqContext, orgId int64) bool {
if !setting.BasicAuthEnabled {
return false
}
header := ctx.Req.Header.Get("Authorization") //获取验证头,这里是BasicAuth,其实jwt是一样的
if header == "" {
return false
}
username, password, err := util.DecodeBasicAuthHeader(header)//获取用户名和密码
if err != nil {
ctx.JsonApiErr(401, "Invalid Basic Auth Header", err)
return true
}
authQuery := models.LoginUserQuery{
Username: username,
Password: password,
}
if err := bus.Dispatch(&authQuery); err != nil {//查数据库验证密码
ctx.Logger.Debug(
"Failed to authorize the user",
"username", username,
)
ctx.JsonApiErr(401, errStringInvalidUsernamePassword, err)
return true
}
user := authQuery.User
query := models.GetSignedInUserQuery{UserId: user.Id, OrgId: orgId}
if err := bus.Dispatch(&query); err != nil { //查询用户信息,根据用户id和组织id
ctx.Logger.Error(
"Failed at user signed in",
"id", user.Id,
"org", orgId,
)
ctx.JsonApiErr(401, errStringInvalidUsernamePassword, err)
return true
}
ctx.SignedInUser = query.Result //如果验证成功,将查询出来的user赋值在context上面去
ctx.IsSignedIn = true //验证成功这步很重要,将登陆赋值为true
return true
}
-
步骤3:将ReqContext context Map 进里面去,这是不是意味着在后面,我们可以写func mid(ctx *models.ReqContext)的中间件。最后就是把就context也设进ctx.Data里面去
ctx.Data["ctx"] = ctx
c.Map(ctx)
登录验证中间件
var (
ReqGrafanaAdmin = Auth(&AuthOptions{//控制权限
ReqSignedIn: true,
ReqGrafanaAdmin: true,
})
ReqSignedIn = Auth(&AuthOptions{ReqSignedIn: true})//控制登录
)
权限认证中间件,这个可以控制是否需要登录和权限认证
func Auth(options *AuthOptions) macaron.Handler {
return func(c *models.ReqContext) { //该中间件获取前面map 进去的context
if !c.IsSignedIn && options.ReqSignedIn && !c.AllowAnonymous {
notAuthorized(c)
return
}
if !c.IsGrafanaAdmin && options.ReqGrafanaAdmin {
accessForbidden(c)
return
}
}
}
如果需要认证而没有认证,那么就会响应,未认证的错误
func notAuthorized(c *models.ReqContext) {
if c.IsApiRequest() { //如果是api 请求直接响应json 信息
c.JsonApiErr(401, "Unauthorized", nil)
return
}
redirectTo := c.Req.RequestURI
if setting.AppSubUrl != "" && !strings.HasPrefix(redirectTo, setting.AppSubUrl) {
redirectTo = setting.AppSubUrl + c.Req.RequestURI
}
WriteCookie(c.Resp, "redirect_to", url.QueryEscape(redirectTo), 0, newCookieOptions)
c.Redirect(setting.AppSubUrl + "/login") //不是api则直接跳转
}
注册路由
grafana\pkg\api\api.go
func (hs *HTTPServer) registerRoutes() {
//下面一堆都是中间件
reqSignedIn := middleware.ReqSignedIn
reqGrafanaAdmin := middleware.ReqGrafanaAdmin
reqEditorRole := middleware.ReqEditorRole
reqOrgAdmin := middleware.ReqOrgAdmin
reqCanAccessTeams := middleware.AdminOrFeatureEnabled(hs.Cfg.EditorsCanAdmin)
reqSnapshotPublicModeOrSignedIn := middleware.SnapshotPublicModeOrSignedIn()
redirectFromLegacyDashboardURL := middleware.RedirectFromLegacyDashboardURL()
redirectFromLegacyDashboardSoloURL := middleware.RedirectFromLegacyDashboardSoloURL()
quota := middleware.Quota(hs.QuotaService)
bind := binding.bind
//下面是注册路由的过程
r := hs.RouteRegister
// not logged in views
r.Get("/logout", hs.Logout)
r.Post("/login", quota("session"), bind(dtos.LoginCommand{}), Wrap(hs.LoginPost))
}
以登录为例,首先获取hs.RouteRegister,然后调用RouteRegister的相关方法,就是http必备的方法
相关方法
// RouteRegister allows you to add routes and macaron.Handlers
// that the web server should serve.
type RouteRegister interface {
// Get adds a list of handlers to a given route with a GET HTTP verb
Get(string, ...macaron.Handler)
// Post adds a list of handlers to a given route with a POST HTTP verb
Post(string, ...macaron.Handler)
// Delete adds a list of handlers to a given route with a DELETE HTTP verb
Delete(string, ...macaron.Handler)
// Put adds a list of handlers to a given route with a PUT HTTP verb
Put(string, ...macaron.Handler)
// Patch adds a list of handlers to a given route with a PATCH HTTP verb
Patch(string, ...macaron.Handler)
// Any adds a list of handlers to a given route with any HTTP verb
Any(string, ...macaron.Handler)
// Group allows you to pass a function that can add multiple routes
// with a shared prefix route.
Group(string, func(RouteRegister), ...macaron.Handler)
// Insert adds more routes to an existing Group.
Insert(string, func(RouteRegister), ...macaron.Handler)
// Register iterates over all routes added to the RouteRegister
// and add them to the `Router` pass as an parameter.
Register(Router)
}
bind是一个数据编解码的中间件,不仅仅只是json,hs.LoginPost是处理业务逻辑的函数,注意需要wrap 一下,下面是wrap函数详情
func Wrap(action interface{}) macaron.Handler {
// action 为实际业务处理函数
return func(c *models.ReqContext) { //注意这里返回是回调函数,参数为前面map 进去的context
var res Response
val, err := c.Invoke(action) //还记得第二篇讲的Invoke吗,将值赋值给函数的参数,然后调用函数
if err == nil && val != nil && len(val) > 0 {
res = val[0].Interface().(Response) //注意函数的返回值必须是Response这个结构体
} else {
res = ServerError(err)
}
res.WriteTo(c)//然后调用(Response..WriteTo方法写入响应结果
}
}
func (r *NormalResponse) WriteTo(ctx *models.ReqContext) {
if r.err != nil {
ctx.Logger.Error(r.errMessage, "error", r.err, "remote_addr", ctx.RemoteAddr())
}
header := ctx.Resp.Header()
for k, v := range r.header {
header[k] = v
}
ctx.Resp.WriteHeader(r.status)
if _, err := ctx.Resp.Write(r.body); err != nil {
ctx.Logger.Error("Error writing to response", "err", err)
}
}
来看看LoginPost 这个action,第一个参数是*models.ReqContext,第二个参数是dtos.LoginCommand,就是bind里面的对象,所以可以猜测bind将前端传进来的数据进行了json 解码,映射到了dtos.LoginCommand这个对象里面。
func (hs *HTTPServer) LoginPost(c *models.ReqContext, cmd dtos.LoginCommand) Response {
if setting.DisableLoginForm {
return Error(401, "Login is disabled", nil)
}
authQuery := &models.LoginUserQuery{
ReqContext: c,
Username: cmd.User,
Password: cmd.Password,
IpAddress: c.Req.RemoteAddr,
}
if err := bus.Dispatch(authQuery); err != nil {
e401 := Error(401, "Invalid username or password", err)
if err == login.ErrInvalidCredentials || err == login.ErrTooManyLoginAttempts {
return e401
}
// Do not expose disabled status,
// just show incorrect user credentials error (see #17947)
if err == login.ErrUserDisabled {
hs.log.Warn("User is disabled", "user", cmd.User)
return e401
}
return Error(500, "Error while trying to authenticate user", err)
}
user := authQuery.User
err := hs.loginUserWithUser(user, c)
if err != nil {
return Error(500, "Error while signing in user", err)
}
result := map[string]interface{}{
"message": "Logged in",
}
if redirectTo, _ := url.QueryUnescape(c.GetCookie("redirect_to")); len(redirectTo) > 0 {
if err := hs.validateRedirectTo(redirectTo); err == nil {
result["redirectUrl"] = redirectTo
} else {
log.Info("Ignored invalid redirect_to cookie value: %v", redirectTo)
}
middleware.DeleteCookie(c.Resp, "redirect_to", hs.cookieOptionsFromCfg)
}
metrics.MApiLoginPost.Inc()
return JSON(200, result)
}
举个简单的例子
前端发送json: "{"userName:"xxoo","passWord":"xxoo"},通过bind进行映射进了里面的结构体,那么我们就可以直接把handler写成LoginPost(c *models.ReqContext, cmd dtos.LoginCommand)形式,或者LoginPost( cmd dtos.LoginCommand)的handler,非常灵活
RouteRegister
RouteRegister有个group 功能,使用方法如下,相当于把几个前缀进行拼接了,只用一个前缀进行控制
// authed api
r.Group("/api", func(apiRoute routing.RouteRegister) {
// user (signed in)
apiRoute.Group("/user", func(userRoute routing.RouteRegister) {
userRoute.Get("/", Wrap(GetSignedInUser)) // get api/user/
userRoute.Put("/", bind(models.UpdateUserCommand{}), Wrap(UpdateSignedInUser))
Wrap(hs.RevokeUserAuthToken))
})
其实在routeRegister结构体实例里面还有个prefix 方法,如果写上"/api/v1",那么在注册是不用写上前缀了,但是前端访问必须使用这个前缀
type routeRegister struct {
prefix string
subfixHandlers []macaron.Handler
namedMiddleware []RegisterNamedMiddleware
routes []route
groups []*routeRegister
}
在server最后调用applyRoutes时候,会调用Register将 macaron实例传进来,router=macaron,最后调用macaron的Handle方法,路由注册可以参考前面的第一篇文章
func (rr *routeRegister) Register(router Router) {
for _, r := range rr.routes {
// GET requests have to be added to macaron routing using Get()
// Otherwise HEAD requests will not be allowed.
// https://github.com/go-macaron/macaron/blob/a325110f8b392bce3e5cdeb8c44bf98078ada3be/router.go#L198
if r.method == http.MethodGet {//注册get方法,因为get方法需要注册head,所以需要特殊处理
router.Get(r.pattern, r.handlers...)
} else {
router.Handle(r.method, r.pattern, r.handlers)//其他方法统一调hanlde进行处理
}
}
for _, g := range rr.groups {
g.Register(router)
}
}
bus
最后分析一下bus是干什么的,以修改密码举例
pkg\services\sqlstore\user.go
authQuery := &models.LoginUserQuery{
ReqContext: c,
Username: cmd.User,
Password: cmd.Password,
IpAddress: c.Req.RemoteAddr,
}
if err := bus.Dispatch(authQuery); err != nil {
}
在业务处理handler里面有很多 这样的函数,其实就是查询数据库而已,将查询与存储层进行解耦,来看看存储层是怎么执行的
func (ss *SqlStore) addUserQueryAndCommandHandlers() {
bus.AddHandler("sql", ChangeUserPassword) //添加到bus里面去
}
或者这样添加
grafana\pkg\services\sqlstore\user_auth.go
func init() {
bus.AddHandler("sql", ChangeUserPassword)
}
ChangeUserPassword实现
func ChangeUserPassword(cmd *models.ChangeUserPasswordCommand) error {
return inTransaction(func(sess *DBSession) error {
user := models.User{
Password: cmd.NewPassword,
Updated: time.Now(),
}
_, err := sess.ID(cmd.UserId).Update(&user)
return err
})
}
ChangeUserPassword的参数就是 bus.Dispatch传进来的参数,可见Dispatch的作用是根据参数类型找到相应的执行函数,执行函数如果参数为Dispatch传入的结构体,最终就能路由到这
bus.AddHandler
grafana\pkg\bus\bus.go+178
func (b *InProcBus) AddHandler(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
queryTypeName := handlerType.In(0).Elem().Name()
b.handlers[queryTypeName] = handler
}
就是获取handler第0个参数,然后将相应的类型做key,handler作为value,下面是一个带context的handler注册
func (b *InProcBus) AddHandlerCtx(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
queryTypeName := handlerType.In(1).Elem().Name()
b.handlersWithCtx[queryTypeName] = handler
}
就是第一个参数为context,所有将第二个参数in(1)添加进map里面
bus.Dispatch
grafana\pkg\bus\bus.go+106
// Dispatch function dispatch a message to the bus.
func (b *InProcBus) Dispatch(msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name() //获取消息的类型名
var handler = b.handlersWithCtx[msgName]
withCtx := true
if handler == nil {
withCtx = false
handler = b.handlers[msgName]
}
if handler == nil {
return ErrHandlerNotFound
}
var params = []reflect.Value{}
if withCtx {
params = append(params, reflect.ValueOf(context.Background()))
}
params = append(params, reflect.ValueOf(msg))
ret := reflect.ValueOf(handler).Call(params)
err := ret[0].Interface()
if err == nil {
return nil
}
return err.(error)
}
-
获取相应类型的handler,然后调用reflect.ValueOf(handler).Call(),反射调用方法,参数为 Dispatch的参数