使用Go语言开发流媒体视频网站

简介

流媒体如今已经成为工业上的一个重要技术了,比如:直播网站、视频监控传输、APP直播等,如何实现一个高并发的视频网站,这就涉及到语言技术的选型以及流媒体技术的使用,本节将主要介绍如何使用Golang来实现一个流媒体视频网站。

背景

为什么选择Go以及Go的一些优势

为什么会选择Go来开发视频网站呢?这其实主要体现在Go语言的优势。那么Go有哪些优势呢?

  • 开发效率高,不管用其他语言,都需要很多其他的配置或插件,就连全家桶配套齐全的Java语言都会需要一个Servlet引擎,如:tomcat、jetty等。但Go在这方面,提供了得天独厚的能力。大部分功能和内容已经集成在了pkg。包括开发完整的开发工具链(tools、test、benchmark、builtin.etc),包括Go命令(go test、go install、go build)。这些都是完整的,直接下载Go后即可使用。
  • 另一方面,部署简单,go属于编译性语言,而且是能够编译多个平台可执行文件的语言。Compile once,run everywhere,直接编译后生成二进制文件,直接运行。
  • 良好的native http库、集成模板引擎,无需添加第三方框架。

GoLang的简介以及实现一个webserver工具链

Go语言是一种编译性语言,而且它的目标是兼具Python等动态语言的开发速度和集成C/C++等编译语言的性能与安全性。

Go中有一些常见的工具链,比如:

  • go build,编译go文件,可以跨平台编译:env GOOS=linux GOARCH=amd64 go build,在CI/CD中,这是一个非常有用的命令。
  • go install,这也是编译,但与build的区别是编译后将输出文件打包成库放在pkg下。
  • go get,用于获取go的第三方包,常见的是:go get -u git地址,表示从git上获取某个资源并安装到本地。
  • go fmt,统一代码风格、排版。
  • go test,运行当前目录下的tests,“go test -v” 会打印所有的信息。
  • go的test文件一般以XXX_test.go命名

要点:

  • 使用TestMain作为初始化test,并且使用Run()来调用其它tests可以完成一些需要初始化操作的testing,如:数据库、文件加载等。
func TestMain(m *testing.M) {
    fmt.Println("Test begin")
    m.Run()
}
  • 如果没在其中加Run(),除了TestMain的其它的tests都不被执行。
func TestPrint(t *testing.T) {
    fmt.Println("Test print")
}

func TestMain(m *testing.M) {
    fmt.Println("Test begin")
    //m.Run()
}

按照上面说的,如果没有执行Run()方法,则TestPrint函数不会被执行。

Golang的channel并发模式

在 Go 中,既然有了协程,那么这些协程之间如何通信呢?Go 提供了一个 channel(通道) 来解决。

声明一个 channel

在 Go 语言中,声明一个 channel 非常简单,使用内置的 make 函数即可,如下:

ch:=make(chan string)

其中 chan 是一个关键字,表示是 channel 类型。后面的 string 表示 channel 里的数据是 string 类型。通过 channel 的声明也可以看到,chan 是一个集合类型。

定义好 chan 后就可以使用了,一个 chan 的操作只有两种:发送和接收:

  • 发送:向 chan 发送值,把值放在 chan 中,操作符为 chan <-
  • 接收:获取 chan 中的值,操作符为 <- chan

示例:

package main

import "fmt"

func main() {

    ch := make(chan string)

    go func() {

        fmt.Println("码疯窝在香嗝喱辣")

        ch <- "发送数据者:码疯窝在香嗝喱辣"

    }()

    fmt.Println("I am main goroutine")

    v := <- ch

    fmt.Println("接收到的chan中的值为:",v)

}

我们先来执行看看打印结果:

I am main goroutine

码疯窝在香嗝喱辣

接收到的chan中的值为:送数据者:码疯窝在香嗝喱辣

从运行结果可以看出:达到了使用 time.Sleep 函数的效果。

相信应该明白为什么程序不会在新的 goroutine 完成之前退出了,因为通过 make 创建的 chan 中没有值,而 main goroutine 又想从 chan 中获取值,获取不到就一直等待,等到另一个 goroutine 向 chan 发送值为止。

无缓冲 channel

上面的示例中,使用 make 创建的 chan 就是一个无缓冲 channel,它的容量是 0,不能存储任何数据。所以无缓冲 channel 只起到传输数据的作用,数据并不会在 channel 中做任何停留。这也意味着,无缓冲 channel 的发送和接收操作是同时进行的,它也被称为同步 channel。

有缓冲 channel

有缓冲 channel 类似一个可阻塞的队列,内部的元素先进先出。通过 make 函数的第二个参数可以指定 channel 容量的大小,进而创建一个有缓冲 channel,如:

cacheCh := make(chan int,5)

定义了一个容量为 5 的元素为 int 类型的 chan。

一个有缓冲 channel 具备以下特点:

  • 有缓冲 channel 的内部有一个缓冲队列
  • 发送操作是向队列的尾部插入元素,如果队列已满,则阻塞等待,直到另一个 goroutine 执行,接收操作释放队列的空间
  • 接收操作是从队列的头部获取元素并把它从队列中删除,如果队列为空,则阻塞等待,直到另一个 goroutine 执行,发送操作插入新的元素
cache := make(chan int,5)

cache <- 2

cache <- 3

fmt.Println("容量:",cap(cache),",元素个数:",len(cache))

无缓冲 channel 其实就是一个容量大小为 0 的 channel。比如 make(chan int,0)

关闭 channel

通过内置函数 close 即可关闭 channel。如果一个 channel 被关闭了,就不能向里面发送数据了,如果发送的话,会引起 painc 异常。但是还可以接收 channel 里的数据,如果 channel 里没有数据的话,接收的数据是元素类型的零值。

单向 channel

所谓单向,即可要不发送,要么只能接收。所以单向 channel 的声明也很简单,只需要在声明的时候带上 <- 操作符即可,如下:

send := make(chan <- int)
receive := make(<- chan int)

用Golang完成一个流媒体网站

业务模块

API接口设计

  • 分层
  • Restful风格设计
  • CRUD区分资源操作
  • 返回码规范

首先,我们写个启动类:

package main 

import (
    "net/http"
    "github.com/julienschmidt/httprouter"
)

type middleWareHandler struct {
    r *httprouter.Router
}

func NewMiddleWareHandler(r *httprouter.Router) http.Handler {
    m := middleWareHandler{}
    m.r = r
    return m
}

func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    //check session
    validateUserSession(r)

    m.r.ServeHTTP(w, r)
}

func RegisterHandlers() *httprouter.Router {
    router := httprouter.New()

    router.POST("/user", CreateUser)

    router.POST("/user/:user_name", Login)

    return router
}

func main() {
    r := RegisterHandlers()
    mh := NewMiddleWareHandler(r)
    http.ListenAndServe(":1000", mh)
}

在这里我们实现了注册、登录以及一些初始化监听端口。接下来,我们需要看看对于后端视频处理时,主要关心的是session:

package session

import (
    "time"
    "sync"
    "github.com/avenssi/video_server/api/defs"
    "github.com/avenssi/video_server/api/dbops"
    "github.com/avenssi/video_server/api/utils"
)

var sessionMap *sync.Map 

func init() {
    sessionMap = &sync.Map{}
}

func nowInMilli() int64{
    return time.Now().UnixNano()/1000000
}

func deleteExpiredSession(sid string) {
    sessionMap.Delete(sid)
    dbops.DeleteSession(sid)
}

func LoadSessionsFromDB() {
    r, err := dbops.RetrieveAllSessions()
    if err != nil {
        return
    }

    r.Range(func(k, v interface{}) bool{
        ss := v.(*defs.SimpleSession)
        sessionMap.Store(k, ss)
        return true
    })
}

func GenerateNewSessionId(un string) string {
    id, _ := utils.NewUUID()
    ct := nowInMilli()
    ttl := ct + 30 * 60 * 1000// Severside session valid time: 30 min

    ss := &defs.SimpleSession{Username: un, TTL: ttl}
    sessionMap.Store(id, ss)
    dbops.InsertSession(id, ttl, un)

    return id
}

func IsSessionExpired(sid string) (string, bool) {
    ss, ok := sessionMap.Load(sid)
    if ok {
        ct := nowInMilli()
        if ss.(*defs.SimpleSession).TTL < ct {
            deleteExpiredSession(sid)
            return "", true
        }

        return ss.(*defs.SimpleSession).Username, false
    }

    return "", true
}

从上面的代码中,可以看到,Go主要引用了相关的视频插件库:avenssi/video_server等来处理缓存session。这也是为什么选择go开发后端的一个原因。

同时,我们还定义了一个错误码信息:

package defs

type Err struct {
    Error string `json:"error"`
    ErrorCode string `json:"error_code"`  
}

type ErrResponse struct {
    HttpSC int
    Error Err
}

var (
    ErrorRequestBodyParseFailed = ErrResponse{HttpSC: 400, Error: Err{Error: "Request body is not correct", ErrorCode: "001"}}
    ErrorNotAuthUser = ErrResponse{HttpSC: 401, Error: Err{Error: "User authentication failed.", ErrorCode: "002"}}
    ErrorDBError = ErrResponse{HttpSC: 500, Error: Err{Error: "DB ops failed", ErrorCode: "003"}}
    ErrorInternalFaults = ErrResponse{HttpSC: 500, Error: Err{Error: "Internal service error", ErrorCode: "004"}}
)

以上对于业务层中处理主要逻辑就是这些了,下面主要讲scheduler和stream server。

scheduler

scheduler主要是来调度任务,那么主要是哪些任务呢?主要是那些普通api无法立即给结果的任务。比如:我们视频网站需要一些视频审核、数据恢复的需求。这时候,我们需要做一些short delay,用户看不到,但后台还是存在的。这就需要scheduler异步处理。还比如有些周期性的任务。

在Scheduler中,还存在Timer,定时器主要用来作定时处理task的。

所以,我们的架构图:

在本小节中,我们采用runner的生产、消费者模式实现。具体代码如下:

package taskrunner

import (
)

type Runner struct {
    Controller controlChan
    Error controlChan
    Data dataChan
    dataSize int
    longLived bool
    Dispatcher fn 
    Executor fn
}

func NewRunner(size int, longlived bool, d fn, e fn) *Runner {
    return &Runner {
        Controller: make(chan string, 1),
        Error: make(chan string, 1),
        Data: make(chan interface{}, size),
        longLived: longlived,
        dataSize: size,
        Dispatcher: d,
        Executor: e,
    }
}

func (r *Runner) startDispatch() {
    defer func() {
        if !r.longLived {
            close(r.Controller)
            close(r.Data)
            close(r.Error)
        }
    }()

    for {
        select {
        case c :=<- r.Controller:
            if c == READY_TO_DISPATCH {
                err := r.Dispatcher(r.Data)
                if err != nil {
                    r.Error <- CLOSE
                } else {
                    r.Controller <- READY_TO_EXECUTE
                }
            }

            if c == READY_TO_EXECUTE {
                err := r.Executor(r.Data)
                if err != nil {
                    r.Error <- CLOSE
                } else {
                    r.Controller <- READY_TO_DISPATCH
                }
            }
        case e :=<- r.Error:
            if e == CLOSE {
                return
            }
        default:

        }
    }
}

func (r *Runner) StartAll() {
    r.Controller <- READY_TO_DISPATCH
    r.startDispatch()
}

Runner是可以复用的,而接下来介绍的Task是定制Runner的。比如:我们延迟删除视频。

我们先拿到数据,看看:

package dbops

import (
    "log"
    _ "github.com/go-sql-driver/mysql"
)

func ReadVideoDeletionRecord(count int) ([]string, error) {
    stmtOut, err := dbConn.Prepare("SELECT video_id FROM video_del_rec LIMIT ?")

    var ids []string

    if err != nil {
        return ids, err
    }

    rows, err := stmtOut.Query(count)
    if err != nil {
        log.Printf("Query VideoDeletionRecord error: %v", err)
        return ids, err
    }

    for rows.Next() {
        var id string
        if err := rows.Scan(&id); err != nil {
            return ids, err
        }

        ids = append(ids, id)
    }

    defer stmtOut.Close()
    return ids, nil
}

func DelVideoDeletionRecord(vid string) error {
    stmtDel, err := dbConn.Prepare("DELETE FROM video_del_rec WHERE video_id=?")
    if err != nil {
        return err
    }

    _, err = stmtDel.Exec(vid)
    if err != nil {
        log.Printf("Deleting VideoDeletionRecord error: %v", err)
        return err
    }

    defer stmtDel.Close()
    return nil
}

拿到数据后,需要处理,这时需要task:

package taskrunner

import (
    "os"
    "errors"
    "log"
    "sync"
    "github.com/avenssi/video_server/scheduler/dbops"
)

func deleteVideo(vid string) error {
    err := os.Remove(VIDEO_PATH + vid)

    if err != nil && !os.IsNotExist(err) {
        log.Printf("Deleting video error: %v", err)
        return err
    }

    return nil
}

func VideoClearDispatcher(dc dataChan) error {
    res, err := dbops.ReadVideoDeletionRecord(3)
    if err != nil {
        log.Printf("Video clear dispatcher error: %v", err)
        return err
    }

    if len(res) == 0 {
        return errors.New("All tasks finished")
    }

    for _, id := range res {
        dc <- id
    }

    return nil
}

func VideoClearExecutor(dc dataChan) error {
    errMap := &sync.Map{}
    var err error

    forloop:
        for {
            select {
            case vid :=<- dc:
                go func(id interface{}) {
                    if err := deleteVideo(id.(string)); err != nil {
                        errMap.Store(id, err)
                        return
                    }
                    if err := dbops.DelVideoDeletionRecord(id.(string)); err != nil {
                        errMap.Store(id, err)
                        return 
                    }
                }(vid)
            default:
                break forloop
            }
        }

    errMap.Range(func(k, v interface{}) bool {
        err = v.(error)
        if err != nil {
            return false
        }
        return true
    })

    return err
}

以上就是关于异步、定时处理视频流信息过程。

stream server

  • Streaming
  • Upload files

Streaming主要区别于普通的链接,它需要保持长链接,与短链接是不一样的,当发送一个request过来,会不断与客户端输出数据流,而且会很长。所以在多路长链接同时保持的时候,出现一个问题,如果不断的发起链接、打开网页,最终会把我们的服务给crash掉,所以,我们需要进行流控:limit,这里的流控可能只在connect时候进行限制。

package main 

import (
    "log"
)

type ConnLimiter struct {
    concurrentConn int
    bucket chan int
}

func NewConnLimiter(cc int) *ConnLimiter {
    return &ConnLimiter {
        concurrentConn: cc,
        bucket: make(chan int, cc),
    }
}

func (cl *ConnLimiter) GetConn() bool {
    if len(cl.bucket) >= cl.concurrentConn {
        log.Printf("Reached the rate limitation.")
        return false
    }

    cl.bucket <- 1
    return true
}

func (cl *ConnLimiter) ReleaseConn() {
    c :=<- cl.bucket
    log.Printf("New connction coming: %d", c)
}

加了流控后,我们需要在http middleware中嵌入流控,同样,我们在启动时,都需要注册router以及http server,所以代码如下:

package main 

import (
    "net/http"
    "github.com/julienschmidt/httprouter"
)

type middleWareHandler struct {
    r *httprouter.Router
    l *ConnLimiter
}

func NewMiddleWareHandler(r *httprouter.Router, cc int) http.Handler {
    m := middleWareHandler{}
    m.r = r
    m.l = NewConnLimiter(cc)
    return m
}

func RegisterHandlers() *httprouter.Router {
    router := httprouter.New()

    router.GET("/videos/:vid-id", streamHandler)

    router.POST("/upload/:vid-id", uploadHandler)

    router.GET("/testpage", testPageHandler)

    return router
}

func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if !m.l.GetConn() {
        sendErrorResponse(w, http.StatusTooManyRequests, "Too many requests")
        return
    }

    m.r.ServeHTTP(w, r)
    defer m.l.ReleaseConn()
}

func main() {
    r := RegisterHandlers()
    mh := NewMiddleWareHandler(r, 2)
    http.ListenAndServe(":2000", mh)
}

最后,我们来看看streamHandler如何来处理:

func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
    vid := p.ByName("vid-id")
    vl := VIDEO_DIR + vid

    video, err := os.Open(vl)
    if err != nil {
        log.Printf("Error when try to open file: %v", err)
        sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
        return
    }

    w.Header().Set("Content-Type", "video/mp4")
    http.ServeContent(w, r, "", time.Now(), video)

    defer video.Close()
}

我们这里采用比较通用的做法:在拿到流唯一信息后,直接处理。

Upload files时,我们需要做静态检查,然后把数据从中读取:

func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
    r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE)
    if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil {
        sendErrorResponse(w, http.StatusBadRequest, "File is too big")
        return 
    }

    file, _, err := r.FormFile("file")
    if err != nil {
        log.Printf("Error when try to get file: %v", err)
        sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
        return 
    }

    data, err := ioutil.ReadAll(file)
    if err != nil {
        log.Printf("Read file error: %v", err)
        sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
    }

    fn := p.ByName("vid-id")
    err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666)
    if err != nil {
        log.Printf("Write file error: %v", err)
        sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
        return
    }

    w.WriteHeader(http.StatusCreated)
    io.WriteString(w, "Uploaded successfully")
}

网站部署

先想给之前的代码进行编译打包:

FROM ubuntu:16.04 as build

ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

RUN apt-get update && apt-get install -y --no-install-recommends \
        g++ \
        ca-certificates \
        wget && \
    rm -rf /var/lib/apt/lists/*

ENV GOLANG_VERSION 1.15.1
RUN wget -nv -O - https://studygolang.com/dl/golang/go1.15.1.linux-amd64.tar.gz \
     | tar -C /usr/local -xz


ENV GOPROXY=https://goproxy.cn,direct
ENV GO111MODULE=on
ENV GOPATH /go
ENV PATH $GOPATH/bin:/usr/local/go/bin:$PATH

WORKDIR /go/src
COPY . .
WORKDIR /go/src/video-service
RUN  sed -i "/runmode/crunmode=pro" /go/src/video-service/conf/app.conf
RUN export CGO_LDFLAGS_ALLOW='-Wl,--unresolved-symbols=ignore-in-object-files' && \
    go install -ldflags="-s -w" -v /go/src/video-service

FROM ubuntu:16.04
WORKDIR /video-service

RUN mkdir -p log
COPY --from=build /go/bin/video-service /video-service
CMD ["./video-service"]

接下来,补充部署脚本:

---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  labels:
    app: video-service
  name: video-service
  namespace: system-server
spec:
  replicas: 1
  selector:
    matchLabels:
      app: video-service
  template:
    metadata:
      labels:
        app: video-service
    spec:
      containers:
        - image: {{ cluster_cfg['cluster']['docker-registry']['prefix'] }}video-service
          imagePullPolicy: Always
          name: video-service
          ports:
            - containerPort: 1000
          #livenessProbe:
            #httpGet:
              #path: /api/v1/healthz
              #port: 1000
              #scheme: HTTP
            #initialDelaySeconds: 15
            #periodSeconds: 10
            #timeoutSeconds: 3
            #failureThreshold: 5
          volumeMounts:
            - name: video-service-config
              mountPath: /video-service/conf
      volumes:
        - name: video-service-config
          configMap:
            name: video-service-config
      nodeSelector:
        video-service: "true"
      restartPolicy: Always

执行编译命令:

sh build/build.sh
kubectl create -f deploy.yml

这里使用K8s部署到机器上。
部署后的服务访问地址:

10.11.3.4:1000
上一篇:剑指Offer——打印从1到最大的n位数(JS实现)


下一篇:漫谈LiteOS之开发板-LiteOS移植(基于GD32450i-EVAL)