深入解析go web框架macaron三-grafana的应用

Grafana 是开源的数据可视化面板,和Protometheus是绝配,常用来做系统监控,将采集的数据进行可视化,里面有很多可视化面板模板,开箱即用。该项目是前后端不分离项目,会采用部分模板渲染,后端采用go,前端采用typescript,如果是新手,可以多看看源码学学做项目的思想。

Grafana 官网地址:Grafana: The open observability platform | Grafana Labs

Grafana github地址: GitHub - grafana/grafana: The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.

Grafana启动

grafana\pkg\cmd\grafana-server\main.go+113

server := NewServer(*configFile, *homePath, *pidFile) //创建server
​
go listenToSystemSignals(server)//监听退出信号
​
err := server.Run() //启动服务

server.Run()

server结构

grafana\pkg\cmd\grafana-server\server.go

// Server is responsible for managing the lifecycle of services.
type Server struct {
    context            context.Context
    shutdownFn         context.CancelFunc
    childRoutines      *errgroup.Group  //还记得前面讲过的errgroup吗
    log                log.Logger
    cfg                *setting.Cfg
    shutdownReason     string
    shutdownInProgress bool
​
    configFile string
    homePath   string
    pidFile    string
​
    RouteRegister routing.RouteRegister `inject:""` //路由注册器,grafana进行了包装,写了一个自己的路由器
    HTTPServer    *api.HTTPServer       `inject:""`
}

来看看run结构,加载配置,初始化登录,最重要的是将registry.GetServices()里面的服务全部注册进来,然后注入到相应的结构体里面的字段。比如server初始化时没有初始化HTTPServer和RouteRegister这两个字段。那么它们从哪里来呢?

// Run initializes and starts services. This will block until all services have
// exited. To initiate shutdown, call the Shutdown method in another goroutine.
func (s *Server) Run() (err error) {
   //加载配置文件
   s.loadConfiguration()
   //写pid文件
   s.writePIDFile()
    //初始化日志文件
   login.Init()
   social.NewOAuthService()
​
   services := registry.GetServices()
​
   if err = s.buildServiceGraph(services); err != nil { //依赖注入
      return
   }
​
   // Initialize services.
   for _, service := range services {
      if registry.IsDisabled(service.Instance) {
         continue
      }
​
      s.log.Debug("Initializing " + service.Name)
​
      if err := service.Instance.Init(); err != nil {
         return errutil.Wrapf(err, "Service init failed")
      }
   }
​
   // Start background services.
   for _, svc := range services {
       service, ok := svc.Instance.(registry.BackgroundService) //实现了RUN(context)方法就是后台服务,如果只实现了Init(),那么在上面就直接执行了
      if !ok {
         continue
      }
​
      if registry.IsDisabled(svc.Instance) {
         continue
      }
​
      // Variable is needed for accessing loop variable in callback
      descriptor := svc
      s.childRoutines.Go(func() error {  //调用errgroup 执行run
         // Don't start new services when server is shutting down.
         if s.shutdownInProgress {
            return nil
         }
​
         err := service.Run(s.context) 
         // Mark that we are in shutdown mode
         // So no more services are started
         s.shutdownInProgress = true
         if err != nil {
            if err != context.Canceled {
               // Server has crashed.
               s.log.Error("Stopped "+descriptor.Name, "reason", err)
            } else {
               s.log.Debug("Stopped "+descriptor.Name, "reason", err)
            }
​
            return err
         }
​
         return nil
      })
   }
​
   defer func() {
      s.log.Debug("Waiting on services...") //等待执行
      if waitErr := s.childRoutines.Wait(); waitErr != nil && !xerrors.Is(waitErr, context.Canceled) {
         s.log.Error("A service failed", "err", waitErr)
         if err == nil {
            err = waitErr
         }
      }
   }()
​
   s.notifySystemd("READY=1")
​
   return
}

答案就在依赖注入,来看看是怎么依赖注入的

// buildServiceGraph builds a graph of services and their dependencies.
func (s *Server) buildServiceGraph(services []*registry.Descriptor) error {
    // Specify service dependencies.
    objs := []interface{}{
        bus.GetBus(),//寻找数据库model层的一个总线bus
        s.cfg, //配置文件
        routing.NewRouteRegister(middleware.RequestMetrics, middleware.RequestTracing),
        localcache.New(5*time.Minute, 10*time.Minute),//缓存
        s,//服务器实例
    }
​
    for _, service := range services { //将注册的实例进行添加,
        objs = append(objs, service.Instance)
    }
​
    var serviceGraph inject.Graph
​
    // Provide services and their dependencies to the graph.
    for _, obj := range objs { //将对象添加进依赖注入
        if err := serviceGraph.Provide(&inject.Object{Value: obj}); err != nil {
            return errutil.Wrapf(err, "Failed to provide object to the graph")
        }
    }
​
    // Resolve services and their dependencies.
    if err := serviceGraph.Populate(); err != nil {//进行依赖注入
        
        
        return errutil.Wrapf(err, "Failed to populate service dependency")
    }
​
    return nil
}

举个例子服务都是怎么注册的

grafana\pkg\api\http_server.go+40

//注册HTTPServer
func init() {
    registry.Register(&registry.Descriptor{
        Name:         "HTTPServer",
        Instance:     &HTTPServer{},
        InitPriority: registry.High,
    })
}

serviceGraph.Provide( )将对象全部放进去,然后调用Populate 进行依赖注入,后面我会将专门写篇文章介绍这个"github.com/facebookgo/inject"包,依赖注册神器

HTTPServer

HTTPServer就是http后台服务器的实例了,我将不用的都删了,其中macaron就是Macaron框架的示例,下面看看怎么被初始化的

type HTTPServer struct {
   log           log.Logger
   macaron       *macaron.Macaron
   httpSrv       *http.Server
   RouteRegister        routing.RouteRegister            `inject:""`
   Bus                  bus.Bus                          `inject:""`
   Cfg                  *setting.Cfg                     `inject:""`
}

在服务器启动时候就会调用init方法,被初始化

//初始化
func (hs *HTTPServer) Init() error {
    hs.log = log.New("http.server")
​
    hs.streamManager = live.NewStreamManager()
    hs.macaron = hs.newMacaron()
    hs.registerRoutes()
​
    return nil
}
//创建Macaron的实例
func (hs *HTTPServer) newMacaron() *macaron.Macaron {
    macaron.Env = setting.Env
    m := macaron.New()
​
    // automatically set HEAD for every GET
    m.SetAutoHead(true)
​
    return m
}

添加中间件

func (hs *HTTPServer) applyRoutes() {
   // start with middlewares & static routes
   hs.addMiddlewaresAndStaticRoutes()
   // then add view routes & api routes
   hs.RouteRegister.Register(hs.macaron)
   // then custom app proxy routes
   hs.initAppPluginRoutes(hs.macaron)
   // lastly not found route
   hs.macaron.NotFound(hs.NotFoundHandler)
}

首先看看添加中间件,去除一些静态路由和其他代码,留下了几个比较重要的中间件,读者感兴趣可以自己去看看源码。

func (hs *HTTPServer) addMiddlewaresAndStaticRoutes() {
	m := hs.macaron
	m.Use(middleware.Logger()) //日志中间件
	m.Use(middleware.Recovery())//错误恢复
	m.Use(middleware.AddDefaultResponseHeaders()) //添加默认响应头
	m.Use(hs.healthHandler)//健康检查
	m.Use(hs.metricsEndpoint)//监控指标采集
	m.Use(middleware.GetContextHandler(//对context 的二次封装
		hs.AuthTokenService,
		hs.RemoteCacheService,
		hs.RenderService,
	))
}

来看看如何对context的二次封装吧,该方法返回是个回调函数,macaron.Handler也是回调函数,所以这个是一个不带next的中间件,会按照中间件顺序执行,在执行过程中进行用户登录验证

grafana\pkg\middleware\middleware.go +40

func GetContextHandler(
   ats models.UserTokenService,
   remoteCache *remotecache.RemoteCache,
   renderService rendering.Service,
) macaron.Handler {
   return func(c *macaron.Context) {
      ctx := &models.ReqContext{ //创建自定义ReqContext,注意是继承了macaron.Context
         Context:        c,
         SignedInUser:   &models.SignedInUser{},//创建用户数据
         IsSignedIn:     false,//是否登录,可能是oauth,ldapp,账号密码,或者jwt,但只有一个登录成功就会为true,如果没有,将在后面的一个中间件将他拦截下来,响应相关认证错误
         AllowAnonymous: false,//是否匿名
         SkipCache:      false,
         Logger:         log.New("context"),//日志实例
      }

      orgId := int64(0)
      orgIdHeader := ctx.Req.Header.Get("X-Grafana-Org-Id")
      if orgIdHeader != "" {
         orgId, _ = strconv.ParseInt(orgIdHeader, 10, 64)
      }

      // the order in which these are tested are important
      // look for api key in Authorization header first
      // then init session and look for userId in session
      // then look for api key in session (special case for render calls via api)
      // then test if anonymous access is enabled
       //下面是一些列认证,只要一个为true,就会退出
      switch {
      case initContextWithRenderAuth(ctx, renderService):
      case initContextWithApiKey(ctx):
      case initContextWithBasicAuth(ctx, orgId):
      case initContextWithAuthProxy(remoteCache, ctx, orgId):
      case initContextWithToken(ats, ctx, orgId):
      case initContextWithAnonymousUser(ctx):
      }

      ctx.Logger = log.New("context", "userId", ctx.UserId, "orgId", ctx.OrgId, "uname", ctx.Login)
      ctx.Data["ctx"] = ctx

      c.Map(ctx)

      // update last seen every 5min
      if ctx.ShouldUpdateLastSeenAt() { //更新token日志
         ctx.Logger.Debug("Updating last user_seen_at", "user_id", ctx.UserId)
         if err := bus.Dispatch(&models.UpdateUserLastSeenAtCommand{UserId: ctx.UserId}); err != nil {
            ctx.Logger.Error("Failed to update last_seen_at", "error", err)
         }
      }
   }
}
  • 步骤1:创建自定义ReqContext

  • 步骤2:进行一系列认证

来随便看看一个认证

func initContextWithBasicAuth(ctx *models.ReqContext, orgId int64) bool {
	if !setting.BasicAuthEnabled {
		return false
	}

	header := ctx.Req.Header.Get("Authorization") //获取验证头,这里是BasicAuth,其实jwt是一样的
	if header == "" {
		return false
	}

	username, password, err := util.DecodeBasicAuthHeader(header)//获取用户名和密码
	if err != nil {
		ctx.JsonApiErr(401, "Invalid Basic Auth Header", err)
		return true
	}

	authQuery := models.LoginUserQuery{
		Username: username,
		Password: password,
	}
	if err := bus.Dispatch(&authQuery); err != nil {//查数据库验证密码
		ctx.Logger.Debug(
			"Failed to authorize the user",
			"username", username,
		)

		ctx.JsonApiErr(401, errStringInvalidUsernamePassword, err)
		return true
	}

	user := authQuery.User

	query := models.GetSignedInUserQuery{UserId: user.Id, OrgId: orgId}
	if err := bus.Dispatch(&query); err != nil { //查询用户信息,根据用户id和组织id
		ctx.Logger.Error(
			"Failed at user signed in",
			"id", user.Id,
			"org", orgId,
		)
		ctx.JsonApiErr(401, errStringInvalidUsernamePassword, err)
		return true
	}

	ctx.SignedInUser = query.Result //如果验证成功,将查询出来的user赋值在context上面去
	ctx.IsSignedIn = true //验证成功这步很重要,将登陆赋值为true
	return true
}
  • 步骤3:将ReqContext context Map 进里面去,这是不是意味着在后面,我们可以写func mid(ctx *models.ReqContext)的中间件。最后就是把就context也设进ctx.Data里面去

 ctx.Data["ctx"] = ctx
 c.Map(ctx)

登录验证中间件

var (
   ReqGrafanaAdmin = Auth(&AuthOptions{//控制权限
      ReqSignedIn:     true,
      ReqGrafanaAdmin: true,
   })
    ReqSignedIn   = Auth(&AuthOptions{ReqSignedIn: true})//控制登录
)

权限认证中间件,这个可以控制是否需要登录和权限认证

func Auth(options *AuthOptions) macaron.Handler {
	return func(c *models.ReqContext) { //该中间件获取前面map 进去的context
		if !c.IsSignedIn && options.ReqSignedIn && !c.AllowAnonymous {
			notAuthorized(c)
			return
		}

		if !c.IsGrafanaAdmin && options.ReqGrafanaAdmin {
			accessForbidden(c)
			return
		}
	}
}

如果需要认证而没有认证,那么就会响应,未认证的错误

func notAuthorized(c *models.ReqContext) {
	if c.IsApiRequest() { //如果是api 请求直接响应json 信息
		c.JsonApiErr(401, "Unauthorized", nil)
		return
	}

	redirectTo := c.Req.RequestURI
	if setting.AppSubUrl != "" && !strings.HasPrefix(redirectTo, setting.AppSubUrl) {
		redirectTo = setting.AppSubUrl + c.Req.RequestURI
	}
	WriteCookie(c.Resp, "redirect_to", url.QueryEscape(redirectTo), 0, newCookieOptions)

	c.Redirect(setting.AppSubUrl + "/login") //不是api则直接跳转
}

注册路由

grafana\pkg\api\api.go

func (hs *HTTPServer) registerRoutes() {
    //下面一堆都是中间件
	reqSignedIn := middleware.ReqSignedIn
	reqGrafanaAdmin := middleware.ReqGrafanaAdmin
	reqEditorRole := middleware.ReqEditorRole
	reqOrgAdmin := middleware.ReqOrgAdmin
	reqCanAccessTeams := middleware.AdminOrFeatureEnabled(hs.Cfg.EditorsCanAdmin)
	reqSnapshotPublicModeOrSignedIn := middleware.SnapshotPublicModeOrSignedIn()
	redirectFromLegacyDashboardURL := middleware.RedirectFromLegacyDashboardURL()
	redirectFromLegacyDashboardSoloURL := middleware.RedirectFromLegacyDashboardSoloURL()
	quota := middleware.Quota(hs.QuotaService)
	bind := binding.bind
   //下面是注册路由的过程
	r := hs.RouteRegister

	// not logged in views
	r.Get("/logout", hs.Logout)
	r.Post("/login", quota("session"), bind(dtos.LoginCommand{}), Wrap(hs.LoginPost))
	
	}

以登录为例,首先获取hs.RouteRegister,然后调用RouteRegister的相关方法,就是http必备的方法

相关方法

// RouteRegister allows you to add routes and macaron.Handlers
// that the web server should serve.
type RouteRegister interface {
	// Get adds a list of handlers to a given route with a GET HTTP verb
	Get(string, ...macaron.Handler)

	// Post adds a list of handlers to a given route with a POST HTTP verb
	Post(string, ...macaron.Handler)

	// Delete adds a list of handlers to a given route with a DELETE HTTP verb
	Delete(string, ...macaron.Handler)

	// Put adds a list of handlers to a given route with a PUT HTTP verb
	Put(string, ...macaron.Handler)

	// Patch adds a list of handlers to a given route with a PATCH HTTP verb
	Patch(string, ...macaron.Handler)

	// Any adds a list of handlers to a given route with any HTTP verb
	Any(string, ...macaron.Handler)

	// Group allows you to pass a function that can add multiple routes
	// with a shared prefix route.
	Group(string, func(RouteRegister), ...macaron.Handler)

	// Insert adds more routes to an existing Group.
	Insert(string, func(RouteRegister), ...macaron.Handler)

	// Register iterates over all routes added to the RouteRegister
	// and add them to the `Router` pass as an parameter.
	Register(Router)
}

bind是一个数据编解码的中间件,不仅仅只是json,hs.LoginPost是处理业务逻辑的函数,注意需要wrap 一下,下面是wrap函数详情

func Wrap(action interface{}) macaron.Handler {
   // action 为实际业务处理函数
   return func(c *models.ReqContext) { //注意这里返回是回调函数,参数为前面map 进去的context
      var res Response
      val, err := c.Invoke(action) //还记得第二篇讲的Invoke吗,将值赋值给函数的参数,然后调用函数
      if err == nil && val != nil && len(val) > 0 {
         res = val[0].Interface().(Response) //注意函数的返回值必须是Response这个结构体
      } else {
         res = ServerError(err)
      }

      res.WriteTo(c)//然后调用(Response..WriteTo方法写入响应结果
   }
}

func (r *NormalResponse) WriteTo(ctx *models.ReqContext) {
   if r.err != nil {
      ctx.Logger.Error(r.errMessage, "error", r.err, "remote_addr", ctx.RemoteAddr())

   }

   header := ctx.Resp.Header()
   for k, v := range r.header {
      header[k] = v
   }
   ctx.Resp.WriteHeader(r.status)
   if _, err := ctx.Resp.Write(r.body); err != nil {
      ctx.Logger.Error("Error writing to response", "err", err)
   }
}

来看看LoginPost 这个action,第一个参数是*models.ReqContext,第二个参数是dtos.LoginCommand,就是bind里面的对象,所以可以猜测bind将前端传进来的数据进行了json 解码,映射到了dtos.LoginCommand这个对象里面。

func (hs *HTTPServer) LoginPost(c *models.ReqContext, cmd dtos.LoginCommand) Response {
   if setting.DisableLoginForm {
      return Error(401, "Login is disabled", nil)
   }

   authQuery := &models.LoginUserQuery{
      ReqContext: c,
      Username:   cmd.User,
      Password:   cmd.Password,
      IpAddress:  c.Req.RemoteAddr,
   }

   if err := bus.Dispatch(authQuery); err != nil {
      e401 := Error(401, "Invalid username or password", err)
      if err == login.ErrInvalidCredentials || err == login.ErrTooManyLoginAttempts {
         return e401
      }

      // Do not expose disabled status,
      // just show incorrect user credentials error (see #17947)
      if err == login.ErrUserDisabled {
         hs.log.Warn("User is disabled", "user", cmd.User)
         return e401
      }

      return Error(500, "Error while trying to authenticate user", err)
   }

   user := authQuery.User

   err := hs.loginUserWithUser(user, c)
   if err != nil {
      return Error(500, "Error while signing in user", err)
   }

   result := map[string]interface{}{
      "message": "Logged in",
   }

   if redirectTo, _ := url.QueryUnescape(c.GetCookie("redirect_to")); len(redirectTo) > 0 {
      if err := hs.validateRedirectTo(redirectTo); err == nil {
         result["redirectUrl"] = redirectTo
      } else {
         log.Info("Ignored invalid redirect_to cookie value: %v", redirectTo)
      }
      middleware.DeleteCookie(c.Resp, "redirect_to", hs.cookieOptionsFromCfg)
   }

   metrics.MApiLoginPost.Inc()
   return JSON(200, result)
}

举个简单的例子

前端发送json: "{"userName:"xxoo","passWord":"xxoo"},通过bind进行映射进了里面的结构体,那么我们就可以直接把handler写成LoginPost(c *models.ReqContext, cmd dtos.LoginCommand)形式,或者LoginPost( cmd dtos.LoginCommand)的handler,非常灵活

RouteRegister

RouteRegister有个group 功能,使用方法如下,相当于把几个前缀进行拼接了,只用一个前缀进行控制

// authed api
r.Group("/api", func(apiRoute routing.RouteRegister) {
   // user (signed in)
   apiRoute.Group("/user", func(userRoute routing.RouteRegister) {
      userRoute.Get("/", Wrap(GetSignedInUser)) // get api/user/
      userRoute.Put("/", bind(models.UpdateUserCommand{}), Wrap(UpdateSignedInUser))
      Wrap(hs.RevokeUserAuthToken))
   })

其实在routeRegister结构体实例里面还有个prefix 方法,如果写上"/api/v1",那么在注册是不用写上前缀了,但是前端访问必须使用这个前缀

type routeRegister struct {
	prefix          string
	subfixHandlers  []macaron.Handler
	namedMiddleware []RegisterNamedMiddleware
	routes          []route
	groups          []*routeRegister
}

在server最后调用applyRoutes时候,会调用Register将 macaron实例传进来,router=macaron,最后调用macaron的Handle方法,路由注册可以参考前面的第一篇文章

func (rr *routeRegister) Register(router Router) {
   for _, r := range rr.routes {
      // GET requests have to be added to macaron routing using Get()
      // Otherwise HEAD requests will not be allowed.
      // https://github.com/go-macaron/macaron/blob/a325110f8b392bce3e5cdeb8c44bf98078ada3be/router.go#L198
      if r.method == http.MethodGet {//注册get方法,因为get方法需要注册head,所以需要特殊处理
         router.Get(r.pattern, r.handlers...)
      } else {
         router.Handle(r.method, r.pattern, r.handlers)//其他方法统一调hanlde进行处理
      }
   }

   for _, g := range rr.groups {
      g.Register(router)
   }
}

bus

最后分析一下bus是干什么的,以修改密码举例

pkg\services\sqlstore\user.go

authQuery := &models.LoginUserQuery{
		ReqContext: c,
		Username:   cmd.User,
		Password:   cmd.Password,
		IpAddress:  c.Req.RemoteAddr,
	}

	if err := bus.Dispatch(authQuery); err != nil {
	}

在业务处理handler里面有很多 这样的函数,其实就是查询数据库而已,将查询与存储层进行解耦,来看看存储层是怎么执行的

func (ss *SqlStore) addUserQueryAndCommandHandlers() {
   bus.AddHandler("sql", ChangeUserPassword) //添加到bus里面去
}

或者这样添加

grafana\pkg\services\sqlstore\user_auth.go

func init() {
	bus.AddHandler("sql", ChangeUserPassword)
}

ChangeUserPassword实现

func ChangeUserPassword(cmd *models.ChangeUserPasswordCommand) error {
	return inTransaction(func(sess *DBSession) error {

		user := models.User{
			Password: cmd.NewPassword,
			Updated:  time.Now(),
		}

		_, err := sess.ID(cmd.UserId).Update(&user)
		return err
	})
}

ChangeUserPassword的参数就是 bus.Dispatch传进来的参数,可见Dispatch的作用是根据参数类型找到相应的执行函数,执行函数如果参数为Dispatch传入的结构体,最终就能路由到这

bus.AddHandler

grafana\pkg\bus\bus.go+178

func (b *InProcBus) AddHandler(handler HandlerFunc) {
	handlerType := reflect.TypeOf(handler)
	queryTypeName := handlerType.In(0).Elem().Name()
	b.handlers[queryTypeName] = handler
}

就是获取handler第0个参数,然后将相应的类型做key,handler作为value,下面是一个带context的handler注册

func (b *InProcBus) AddHandlerCtx(handler HandlerFunc) {
   handlerType := reflect.TypeOf(handler)
   queryTypeName := handlerType.In(1).Elem().Name()
   b.handlersWithCtx[queryTypeName] = handler
}

就是第一个参数为context,所有将第二个参数in(1)添加进map里面

bus.Dispatch

grafana\pkg\bus\bus.go+106

// Dispatch function dispatch a message to the bus.
func (b *InProcBus) Dispatch(msg Msg) error {
	var msgName = reflect.TypeOf(msg).Elem().Name() //获取消息的类型名

	var handler = b.handlersWithCtx[msgName]
	withCtx := true

	if handler == nil {
		withCtx = false
		handler = b.handlers[msgName]
	}

	if handler == nil {
		return ErrHandlerNotFound
	}

	var params = []reflect.Value{}
	if withCtx {
		params = append(params, reflect.ValueOf(context.Background()))
	}
	params = append(params, reflect.ValueOf(msg))

	ret := reflect.ValueOf(handler).Call(params)
	err := ret[0].Interface()
	if err == nil {
		return nil
	}
	return err.(error)
}
  • 获取相应类型的handler,然后调用reflect.ValueOf(handler).Call(),反射调用方法,参数为 Dispatch的参数

上一篇:#树形dp#洛谷 3687 [ZJOI2017]仙人掌


下一篇:python自动化发QQ消息