启动加载配置
直接找到nsqd中main包下的start方法,即下图中代码
func (p *program) Start() error { //返回options结构体 包含了一些默认信息 //例如id,自身监听的tcp端口和http端口等,还包括了一系列阈值 //这儿注意id使用hostname 进行MD5后生成的三列码 opts := nsqd.NewOptions() //设置一系列参数解析规则 flagSet := nsqdFlagSet(opts) //解析命令行参数 例如--lookupd-tcp-address=127.0.0.1:4160 或者--version flagSet.Parse(os.Args[1:]) //重置随机数 rand.Seed(time.Now().UTC().UnixNano()) //如果用户是查询版本 即附带的是--version 则直接打印版本即可 if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { fmt.Println(version.String("nsqd")) os.Exit(0) } var cfg config //如果有指定配置文件 configFile := flagSet.Lookup("config").Value.String() if configFile != "" { //进行配置文件解析 _, err := toml.DecodeFile(configFile, &cfg) if err != nil { log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) } } //验证下参数有效性 cfg.Validate() //解析下配置 options.Resolve(opts, flagSet, cfg) //根据配置创建一个NSQD 这是一个核心结构体 nsqd := nsqd.New(opts) //看下是否有必要的核心元数据 err := nsqd.LoadMetadata() if err != nil { log.Fatalf("ERROR: %s", err.Error()) } //如果系统已经有了某个topic或者channel则重启一下那些已有的即可 err = nsqd.PersistMetadata() if err != nil { log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) } //核心启动流程 nsqd.Main() p.nsqd = nsqd return nil }
可见本方法主要是加载并解析配置,配置的来源主要有两种,命令行键入以及指定配置文件。
第一行中的nsqd.NewOptions()方法会返回一个设置了一些默认配置的结构体,例如监听的tcp端口以及http端口等,还有本实例的id,这儿注意实例id是根据服务器的hostname进行md5散列得到的,所以如果要进行nsq集群的话需要注意hostname不能重复。tcp默认端口为4150,http监听端口为4151,https则为4152
第二行 nsqdFlagSet(opts)以及后面的操作 主要设置一系列参数解析规则。以及根据这些规则去解析参数,其中会用到go的flag包,用来解析启动参数,关于flag包我们可以做一个简单的demo应用介绍。
1 go flag包基本使用
flag主要是go用来进行命令行参数解析的一个包,使用起来也比较简单易懂,下面就是一个简单的demo
func main() { flagSet := flag.NewFlagSet("person", flag.ExitOnError) // basic options flagSet.Bool("alive", false, "print version string") flagSet.String("msg", "", "set log verbosity: debug, info, warn, error, or fatal") flagSet.Parse(os.Args[1:]) if flagSet.Lookup("alive").Value.(flag.Getter).Get().(bool) { fmt.Printf("i‘m still alive \n") fmt.Printf("获取到msg:%s \n",flagSet.Lookup("msg").Value.(flag.Getter).String()) }else { fmt.Printf("i‘m dead \n") os.Exit(0) } flagSet.PrintDefaults() }
使用起来较为简单 就是新建一个FlagSet,然后添加自己需要进行解析的参数,并且设置格式以及简介,就可以丢命令行参数进行解析并提供查找功能,最后使用PrintDefaults功能还可以打印出提示信息等
2 解析文件
解析配置中这段方法我们很容易发现是查看命令行是否有指定config 即配置文件的地址,如果有配置文件则从配置文件读取配置
configFile := flagSet.Lookup("config").Value.String() if configFile != "" { _, err := toml.DecodeFile(configFile, &cfg) if err != nil { log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) } }
go语言原生提供了文件解析较为方便的api,我们一般直接使用就可以了,例如我们想要解析一个json文件,并将其赋值到自定义结构体
func main() { type Person struct{ Username string `json:"username"` Password string `json:"password"` } ps := []Person{} bys,err :=ioutil.ReadFile("C:\\Users\\kk\\Desktop\\default\\person.json") if err != nil { log.Fatal(err) } fmt.Println(string(bys)) errs :=json.Unmarshal(bys,&ps) if errs != nil { log.Fatal(errs) } fmt.Println(ps) }
json内容为
3反射赋值,
通过命令行解析到的flagSet以及配置文件解析到的configMap 都会通过反射赋值到我们一开始创建的options中。主要为这句代码
options.Resolve(opts, flagSet, cfg)
里面主要通过反射获取到结构体即opt的所有filed然后通过反射进行赋值。go中有关反射的使用我们可以通过如下demo来了解使用
func main() { type Person struct { Username string `json:"username"` Password string `json:"password"` } p := Person{} fil :=reflect.ValueOf(&p).Elem() ts :=fil.Type() for i := 0;i < ts.NumField();i++ { filed := ts.Field(i) if strings.HasPrefix(filed.Name,"User") { f :=fil.FieldByName(filed.Name) f.Set(reflect.ValueOf("赵六")) } } fmt.Printf("通过反射设置的username的值为:%s \n",p.Username) }
到此配置的解析工作就差不多完成了,最终根据这些配置信息,创建了系统的核心结构体 NSQD,一些关键信息例如监听器,topic信息,都存在这个结构体里边。这会儿可以不用知道里面的全部变量的用处,可以先挑些核心的理解
type NSQD struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 sync.RWMutex opts atomic.Value dl *dirlock.DirLock isLoading int32 errValue atomic.Value startTime time.Time //topic信息 topicMap map[string]*Topic clientLock sync.RWMutex clients map[int64]Client lookupPeers atomic.Value //tcp监听器 tcpListener net.Listener //http监听器 httpListener net.Listener //https监听器 httpsListener net.Listener //tls信息 tlsConfig *tls.Config poolSize int serf *serf.Serf serfEventChan chan serf.Event gossipChan chan interface{} gossipKey []byte rdb *registrationdb.RegistrationDB notifyChan chan interface{} optsNotificationChan chan struct{} exitChan chan int waitGroup util.WaitGroupWrapper ci *clusterinfo.ClusterInfo }