Go 8 配置文件解析、goroutine、channel

Go 8

time

时间格式化

2006-01-02 15:04:05.000

时间类型

  • time.Time:time.Now()
  • 时间戳:time.Now().Unix(),time.Now().UnixNano()
// 时间对象转时间戳
fmt.Println(now.Unix())
fmt.Println(now.UnixNano())

// 时间戳转时间对象
ret := time.Unix(1620097458, 0)
fmt.Println(ret)

// 格式化时间:对象转字符串
fmt.Println(now.Format("2006-01-02 15:04:05"))
fmt.Println(now.Format("2006-01-02 03:04:05 PM"))

时间间隔类型

  • time.Duration:时间间隔类型
    • time.Second
    • time.Minute
    • time.Hour

时间操作

时间对象+/-一个事件间隔对象

time.Now().Sub()

// 按照指定时区解析时间
t1, _ := time.ParseInLocation("2006-01-02 15:04:05", "2021-05-05 11:40:22", time.Local)
fmt.Println(t1)
loc, _ := time.LoadLocation("Asia/Shanghai")
t3, _ := time.ParseInLocation("2006-01-02 15:04:05", "2021-05-05 11:40:22", loc)
fmt.Println(t3)

fmt.Println(t1.Sub(now))
fmt.Println(t2.Sub(now))
fmt.Println(t3.Sub(now))

定时器

// 定时器
	 timer := time.Tick(time.Second)
	 for t := range timer {
	 	fmt.Println(t) // 1s中执行一次
	}

日志库

思路、练习

time

文件操作

runtime.Caller()——> pc,filename,funcname,lineno

反射

接口类型变量底层是分为两部分的:动态类型和动态值。

反射的应用:json解析、ORM工具

反射的两个方法:

  • reflect.TypeOf()
  • reflect.ValueOf()

配置文件ini解析

package main

import (
	"errors"
	"fmt"
	"io/ioutil"
	"reflect"
	"strconv"
	"strings"
)

// ini配置文件解析器

// MysqlConfig
type MysqlConfig struct {
	Address  string `ini:"address"`
	Port     int    `ini:"port"`
	Username string `ini:"username"`
	Password string `ini:"password"`
}

type RedisConfig struct {
	Host     string `ini:"host"`
	Port     int    `ini:"port"`
	Password string `ini:"password"`
	Database int    `ini:"database"`
}

type Config struct {
	MysqlConfig `ini:"mysql"`
	RedisConfig `ini:"redis"`
}

func loadini(filename string, data interface{}) (err error) {
	// 0.  参数的校验
	// 0.1 传进来的data必须是指针类型(因为需要在函数中对其赋值)
	t := reflect.TypeOf(data)
	// fmt.Println(t.Name(), t.Kind())
	if t.Kind() != reflect.Ptr {
		err = errors.New("data param must be a pointer") // 格式化输出之后返回一个error类
		return
	}
	// 0.2 传进来的data参数必须结构体类型执行(因为配置文件中各种键值对需要赋值给结构体字段)
	if t.Elem().Kind() != reflect.Struct {
		err = errors.New("data param must be a struct pointer") // 格式化输出之后返回一个error类
		return
	}
	// 1.  读文件得到字节类型的数据
	b, err := ioutil.ReadFile(filename)
	if err != nil {
		return
	}
	// string(b) // 将字节类型的文件内容转换成字符串
	lineslice := strings.Split(string(b), "\r\n")
	// fmt.Printf("%#v", lineslice)
	// 2.  一行一行地读数据
	var structname string
	for index, line := range lineslice {
		// 去掉首位空格
		line = strings.TrimSpace(line)
		// 如果是空行就跳过
		if len(line) == 0 {
			continue
		}
		// 2.1 如果是注释则忽略
		if strings.HasPrefix(line, ";") || strings.HasPrefix(line, "#") {
			continue
		}
		// 2.2 不是注释如果是[开头就表示是节(section)
		if strings.HasPrefix(line, "[") {
			if line[0] != '[' || line[len(line)-1] != ']' {
				err = fmt.Errorf("line:%d suntax errpr", index+1)
				return
			}
			// 把这行的首位的[]去掉,取到中间的内容把首位空格去掉拿到内容
			sectionname := strings.TrimSpace(line[1 : len(line)-1])
			if len(sectionname) == 0 {
				err = fmt.Errorf("line:%d suntax errpr", index+1)
				return
			}
			// 根据字符串sectionname去data里面根据反射找到对应的结构体
			for i := 0; i < t.Elem().NumField(); i++ {
				field := t.Elem().Field(i)
				if sectionname == field.Tag.Get("ini") {
					// 说明找到了对应的嵌套结构体,把字段名记下来
					structname = field.Name
					// fmt.Println(structname, sectionname)
				}
			}
		} else {
			// 2.3 如果不是[开头则是=分隔的键值对
			// 1. 以等号分隔这一行,等号左边是key,等号右边是value
			if !strings.Contains(line, "=") {
				err = fmt.Errorf("line:%d syntax error", index+1)
				return
			}
			index := strings.Index(line, "=")
			key := strings.TrimSpace(line[:index])
			value := strings.TrimSpace(line[index+1:])
			if len(value) == 0 {
				continue
			}
			// ls := strings.Split(line, "=")
			// fmt.Println(key, value)
			// fmt.Println(ls)
			// 2. 根据structname 去 data 里面吧对应的嵌套结构体给取出来
			v := reflect.ValueOf(data)
			structobj := v.Elem().FieldByName(structname)
			if structobj.Kind() != reflect.Struct {
				err = fmt.Errorf("data中的%s字段应该是一个结构体", structname)
				return
			}
			var fieldname string
			var fieldtype reflect.StructField
			// 3. 遍历嵌套结构体的每一个字段,判断tag是不是等于key
			for i := 0; i < structobj.NumField(); i++ {
				field := structobj.Type().Field(i)
				fieldtype = field
				if field.Tag.Get("ini") == key {
					// 找到对应的字段
					fieldname = field.Name
					// 4. 如果key = tag , 给这个字段赋值
					// 4.1 根据fieldname 去取出这个字段
					fileobj := structobj.FieldByName(fieldname)
					// 4.2 对其赋值
					fmt.Println(fieldname, fieldtype.Type.Kind())
					switch fieldtype.Type.Kind() {
					case reflect.String:
						fmt.Println(value)
						fileobj.SetString(value)
					case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
						var valueint int64
						valueint, err = strconv.ParseInt(value, 10, 64)
						if err != nil {
							err = fmt.Errorf("line:%d syntax error", index+1)
							return
						}
						fileobj.SetInt(valueint)
					}
				}
			}

		}
	}
	return
}

func main() {
	var cfg Config
	err := loadini("./conf.ini", &cfg)
	if err != nil {
		fmt.Printf("failed, err:%v\n", err)
		return
	}
	fmt.Println(cfg)
}

今日内容

strconv标准库(类型转换)

package main

import (
	"fmt"
	"strconv"
)

// strconv

func main() {
	// 从字符串中解析出对应的数字
	str := "10000"
	ret, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		fmt.Printf("failed, err:%v\n", err)
		return
	}
	fmt.Printf("%#v, %T\n", ret, ret)

	// 字符串转化为整型
	r1, _ := strconv.Atoi("11111")
	fmt.Printf("%T,%#v\n", r1, r1)

	// 整型转字符串
	r2 := strconv.Itoa(r1)
	fmt.Printf("%T,%#v\n", r2, r2)

	// 字符串转布尔值
	boolstr := "true"
	b1, _ := strconv.ParseBool(boolstr)
	fmt.Printf("%T,%#v\n", b1, b1)

	// 把数字转换成字符串类型
	i := int32(97)
	ret1 := string(i) // "a"
	fmt.Printf("%#v\n", ret1)

	ret2 := fmt.Sprintf("%d", i)
	fmt.Printf("%#v\n", ret2) // "97"
}

并发和并行

并发:同一时间段内执行多个任务(你在用微信和两个女朋友聊天)。

并行:同一时刻执行多个任务(你和你朋友都在用微信和女朋友聊天)。

Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。

Go语言还提供channel在多个goroutine间进行通信。goroutinechannel是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础。

goroutine

在java/c++中我们要实现并发编程的时候,我们通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务,同时需要自己去调度线程执行任务并维护上下文切换,这一切通常会耗费程序员大量的心智。那么能不能有一种机制,程序员只需要定义很多个任务,让系统去帮助我们把这些任务分配到CPU上实现并发执行呢?

Go语言中的goroutine就是这样一种机制,goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。

在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能–goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,就是这么简单粗暴。

goroutine什么时候结束?

goroutine对应的函数结束了,goroutine就结束了。

main函数执行完了,由main函数创建的那些goroutine都结束了。

package main

import (
	"fmt"
	"time"
)

// goroutine

func hello(i int) {
	fmt.Println("hello", i)
}

// 程序启动之后会创建一个主goroutine去执行
func main() {
	for i := 0; i < 1000; i++ {
		// go hello(i) // 开启一个单独的goroutine去执行hello函数(任务)
		go func(i int) {
			fmt.Println(i) // 用的是函数参数的i,不是外面的i
		}(i)
	}
	fmt.Println("main")
	// main函数结束了 由main函数启动的goroutine也都结束了
	time.Sleep(time.Second)
}

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// waitGroup

func f() {
	rand.Seed(time.Now().UnixNano())
	for i := 0; i < 5; i++ {
		r1 := rand.Int()    // int64
		r2 := rand.Intn(10) // 0<= x <10
		fmt.Println(r1, r2)
	}
}

func f1(i int) {
	defer wg.Done()
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)))
	fmt.Println(i)
}

var wg sync.WaitGroup

func main() {
	// f()
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go f1(i)
	}
	// 如何知道这10个goroutine都结束了
	wg.Wait() // 等待wg的计数器减为0
}

package main

import (
	"fmt"
	"runtime"
	"sync"
)

// GOMAXPROCS
var wg sync.WaitGroup

func a() {
	defer wg.Done()
	for i := 0; i < 1000; i++ {
		fmt.Printf("A:%d\n", i)
	}
}

func b() {
	defer wg.Done()
	for i := 0; i < 1000; i++ {
		fmt.Printf("B:%d\n", i)
	}
}

func main() {
	runtime.GOMAXPROCS(2)         // 默认CPU的逻辑核心数,跑满整个CPU
	fmt.Println(runtime.NumCPU()) // cpu个数
	wg.Add(2)
	go a()
	go b()
	wg.Wait()
}

可增长的栈

OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这么大。所以在Go语言中一次创建十万左右的goroutine也是可以的。

goroutine调度

GMP

M:N把m个goroutine分配给n个操作系统线程去执行

goroutine初始栈的大小是2k。

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。

  • G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
  • P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
  • M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;

P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。

P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

math/rand

func f() {
	rand.Seed(time.Now().UnixNano())
	for i := 0; i < 5; i++ {
		r1 := rand.Int()    // int64
		r2 := rand.Intn(10) // 0<= x <10
		fmt.Println(r1, r2)
	}
}

channel

package mainimport "fmt"var a []intvar b chan int // 需要执行通道中元素的类型,chan是引用类型,类似于slicefunc main() {	fmt.Println(b)         // nil	b = make(chan int)     // 通道初始化。不带缓冲区	b = make(chan int, 16) // 带缓冲区的通道初始化	fmt.Println(b)}

通道必须使用make函数初始化才能使用!!!

通道的操作

<-

  1. 发送:ch1 <- 1
  2. 接收:<- ch1x := <- ch1
  3. 关闭:close()

有缓冲区和无缓冲区通道

package main

import (
	"fmt"
	"sync"
)

var a []int
var b chan int // 需要执行通道中元素的类型,chan是引用类型,类似于slice
var wg sync.WaitGroup

func nobufch() {
	fmt.Println(b)     // nil
	b = make(chan int) // 通道初始化。不带缓冲区
	wg.Add(1)
	go func() {
		defer wg.Done()
		x := <-b
		fmt.Println("后台goroutine从通道b中取到了", x)
	}()
	b <- 10 // 卡在这
	fmt.Println("数值10发送到通道b中了....")
	wg.Wait()
}

func bufch() {
	fmt.Println(b)         // nil
	b = make(chan int, 16) // 通道初始化。带缓冲区
	b <- 10
	fmt.Println("数值10发送到通道b中了....")
	x := <-b
	fmt.Println("从通道b中获取值", x)
	close(b)
}

func main() {
	bufch()
}

通道中循环取值

注意:range取值时要关闭通道不然会卡死

package main

import (
	"fmt"
	"sync"
)

// 通道循环取值

// 1.启动一个goroutine,生成100个数发送到ch1中
// 2.启动一个goroutine,从ch1中取值,计算其平方放到ch2中
// 3.在main中,从ch2取值打印出来
var wg sync.WaitGroup
var once sync.Once

func f1(ch1 chan int) {
	defer wg.Done()
	for i := 0; i < 100; i++ {
		ch1 <- i
	}
	close(ch1)
}

func f2(ch1, ch2 chan int) {
	defer wg.Done()
	// for {
	// 	x, ok := <-ch1
	// 	if !ok {
	// 		break
	// 	}
	// 	ch2 <- x * x
	// }
	for x := range ch1 {
		ch2 <- x * x
	}
	close(ch2)
	// once.Do(func() { close(ch2) }) // 确保某个操作只执行一次
}

func main() {
	a := make(chan int, 50)
	b := make(chan int, 100)
	wg.Add(2)
	go f1(a)
	go f2(a, b)
	// go f2(a, b)
	wg.Wait()
	for ret := range b {
		fmt.Println(ret)
	}

}

关闭通道

有两种方式在接收值的时候判断该通道是否被关闭,不过我们通常使用的是for range的方式。使用for range遍历通道,当通道被关闭的时候就会退出for range

package main

import "fmt"

// 关闭通道

func main() {
	ch1 := make(chan int, 2)
	ch1 <- 10
	ch1 <- 20
	close(ch1)
	for x := range ch1 {
		fmt.Println(x)
	}

	<-ch1
	<-ch1
	x, ok := <-ch1
	fmt.Println(x, ok)
	x, ok = <-ch1
	fmt.Println(x, ok)
	x, ok = <-ch1
	fmt.Println(x, ok)
}

单向通道:只用于函数的参数或者返回值(time.Tick)

通道总结

channel常见的异常总结,如下图:

Go 8 配置文件解析、goroutine、channel

关闭已经关闭的channel也会引发panic

wooker pool (goroutine池)

package main

import (
	"fmt"
	"time"
)

// worker pool

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Printf("worker:%d start job:%d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("worker:%d end job:%d\n", id, j)
		results <- j * 2
	}
}

func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)

	// 开启3个goroutine
	for w := 0; w < 3; w++ {
		go worker(w, jobs, results)
	}

	// 5个任务
	for j := 0; j < 5; j++ {
		jobs <- j
	}
	close(jobs)
	// 输出结果
	for a := 1; a < 5; a++ {
		<-results
	}
}

练习题

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// 使用goroutine和channel实现一个计算int64随机数各位数和的程序。
// 	1.开启一个goroutine循环生成int64类型的随机数,发送到jobChan
// 	2.开启24个goroutine从jobChan中取出随机数计算各位数的和,将结果发送到resultChan
// 	3.主goroutine从resultChan取出结果并打印到终端输出

//
type job struct {
	value int64
}

//
type result struct {
	job    *job
	result int64
}

var jobchan = make(chan *job, 100)
var resultchan = make(chan *result, 100)
var wg sync.WaitGroup

func producer(ch1 chan<- *job) {
	defer wg.Done()
	// 循环生成int64类型的随机数,发送到jobchan
	for {
		x := rand.Int63()
		newjob := &job{value: x}
		ch1 <- newjob
		time.Sleep(time.Millisecond * 500)
	}

}

func consumer(ch1 <-chan *job, ch2 chan<- *result) {
	wg.Done()
	// 从jobChan中取出随机数计算各位数的和,将结果发送到resultChan
	for {
		job := <-ch1
		sum := int64(0)
		n := job.value
		for n > 0 {
			sum += n % 10
			n = n / 10
		}
		newres := &result{
			job:    job,
			result: sum,
		}
		ch2 <- newres
	}
}

func main() {
	wg.Add(1)
	go producer(jobchan)
	// 开启24个goroutine
	wg.Add(24)
	for i := 0; i < 24; i++ {
		go consumer(jobchan, resultchan)
	}
	// 主goroutine从resultChan取出结果并打印到终端输出
	for res := range resultchan {
		fmt.Printf("value:%d, sum:%d\n", res.job.value, res.result)
	}
	wg.Wait()
}

select多路复用

package main

import "fmt"

// select 多路复用

func main() {
	ch := make(chan int, 2)
	for i := 0; i < 10; i++ {
		select {
		case x := <-ch:
			fmt.Println(x)
		case ch <- i:
		}
	}
}

上一篇:Go-Go


下一篇:387集Go语言核心编程培训视频教材整理 | goroutine和channel(四)