goreplay~流量变速

变速配置从何解析

gor.go 主方法 初始化插件,启动emiter监听

func main() {
    if os.Getenv("GOMAXPROCS") == "" {
        runtime.GOMAXPROCS(runtime.NumCPU() * 2)
    }

    args := os.Args[1:]
    var plugins *InOutPlugins
    if len(args) > 0 && args[0] == "file-server" {
        if len(args) != 2 {
            log.Fatal("You should specify port and IP (optional) for the file server. Example: `gor file-server :80`")
        }
        dir, _ := os.Getwd()

        Debug(0, "Started example file server for current directory on address ", args[1])

        log.Fatal(http.ListenAndServe(args[1], loggingMiddleware(args[1], http.FileServer(http.Dir(dir)))))
    } else {
        flag.Parse()
        checkSettings()
        plugins = NewPlugins()
    }

    log.Printf("[PPID %d and PID %d] Version:%s\n", os.Getppid(), os.Getpid(), VERSION)

    if len(plugins.Inputs) == 0 || len(plugins.Outputs) == 0 {
        log.Fatal("Required at least 1 input and 1 output")
    }

    if *memprofile != "" {
        profileMEM(*memprofile)
    }

    if *cpuprofile != "" {
        profileCPU(*cpuprofile)
    }

    if Settings.Pprof != "" {
        go func() {
            log.Println(http.ListenAndServe(Settings.Pprof, nil))
        }()
    }

    closeCh := make(chan int)
    emitter := NewEmitter()
    go emitter.Start(plugins, Settings.Middleware)
    if Settings.ExitAfter > 0 {
        log.Printf("Running gor for a duration of %s\n", Settings.ExitAfter)

        time.AfterFunc(Settings.ExitAfter, func() {
            log.Printf("gor run timeout %s\n", Settings.ExitAfter)
            close(closeCh)
        })
    }
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    exit := 0
    select {
    case <-c:
        exit = 1
    case <-closeCh:
        exit = 0
    }
    emitter.Close()
    os.Exit(exit)
}

plugins.go中,通过limiter类执行各种变速

func (plugins *InOutPlugins) registerPlugin(constructor interface{}, options ...interface{}) {
    var path, limit string
    vc := reflect.ValueOf(constructor)

    // Pre-processing options to make it work with reflect
    vo := []reflect.Value{}
    for _, oi := range options {
        vo = append(vo, reflect.ValueOf(oi))
    }

    if len(vo) > 0 {
        // Removing limit options from path
        path, limit = extractLimitOptions(vo[0].String())

        // Writing value back without limiter "|" options
        vo[0] = reflect.ValueOf(path)
    }

    // Calling our constructor with list of given options
    plugin := vc.Call(vo)[0].Interface()

    if limit != "" {
        plugin = NewLimiter(plugin, limit)
    }

    // Some of the output can be Readers as well because return responses
    if r, ok := plugin.(PluginReader); ok {
        plugins.Inputs = append(plugins.Inputs, r)
    }

    if w, ok := plugin.(PluginWriter); ok {
        plugins.Outputs = append(plugins.Outputs, w)
    }
    plugins.All = append(plugins.All, plugin)

}

类似 'test.gor|100%' 解析

func extractLimitOptions(options string) (string, string) {
    split := strings.Split(options, "|")

    if len(split) > 1 {
        return split[0], split[1]
    }

    return split[0], ""
}

执行方法如下

package main

import (
    "fmt"
    "io"
    "math/rand"
    "strconv"
    "strings"
    "time"
)

// Limiter is a wrapper for input or output plugin which adds rate limiting
type Limiter struct {
    plugin    interface{}
    limit     int
    isPercent bool

    currentRPS  int
    currentTime int64
}

func parseLimitOptions(options string) (limit int, isPercent bool) {
    if n := strings.Index(options, "%"); n > 0 {
        limit, _ = strconv.Atoi(options[:n])
        isPercent = true
    } else {
        limit, _ = strconv.Atoi(options)
        isPercent = false
    }

    return
}

// NewLimiter constructor for Limiter, accepts plugin and options
// `options` allow to sprcify relatve or absolute limiting
func NewLimiter(plugin interface{}, options string) PluginReadWriter {
    l := new(Limiter)
    l.limit, l.isPercent = parseLimitOptions(options)
    l.plugin = plugin
    l.currentTime = time.Now().UnixNano()

    // FileInput have its own rate limiting. Unlike other inputs we not just dropping requests, we can slow down or speed up request emittion.
    if fi, ok := l.plugin.(*FileInput); ok && l.isPercent {
        fi.speedFactor = float64(l.limit) / float64(100)
    }

    return l
}

func (l *Limiter) isLimited() bool {
    // File input have its own limiting algorithm
    if _, ok := l.plugin.(*FileInput); ok && l.isPercent {
        return false
    }

    if l.isPercent {
        return l.limit <= rand.Intn(100)
    }

    if (time.Now().UnixNano() - l.currentTime) > time.Second.Nanoseconds() {
        l.currentTime = time.Now().UnixNano()
        l.currentRPS = 0
    }

    if l.currentRPS >= l.limit {
        return true
    }

    l.currentRPS++

    return false
}

// PluginWrite writes message to this plugin
func (l *Limiter) PluginWrite(msg *Message) (n int, err error) {
    if l.isLimited() {
        return 0, nil
    }
    if w, ok := l.plugin.(PluginWriter); ok {
        return w.PluginWrite(msg)
    }
    // avoid further writing
    return 0, io.ErrClosedPipe
}

// PluginRead reads message from this plugin
func (l *Limiter) PluginRead() (msg *Message, err error) {
    if r, ok := l.plugin.(PluginReader); ok {
        msg, err = r.PluginRead()
    } else {
        // avoid further reading
        return nil, io.ErrClosedPipe
    }

    if l.isLimited() {
        return nil, nil
    }

    return
}

func (l *Limiter) String() string {
    return fmt.Sprintf("Limiting %s to: %d (isPercent: %v)", l.plugin, l.limit, l.isPercent)
}

// Close closes the resources.
func (l *Limiter) Close() error {
    if fi, ok := l.plugin.(io.Closer); ok {
        fi.Close()
    }
    return nil
}

 

上一篇:【含源码】无刷直流BLDC电机的速度控制仿真


下一篇:BLDC开发笔记4.转速的计算