goreplay~v1.3.0新增项

--input-file-dry-run:  预览时间和请求信息

--input-file-max-wait:  允许跳过已记录文件中的长暂停(s)

--input-file-read-depth: 预读和缓冲请求(并排序)。缺省值是100

源码解析,plugins.go

for _, options := range Settings.InputFile {
    plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun)
}

输入文件构造器

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool, readDepth int, maxWait time.Duration, dryRun bool) (i *FileInput) {
    i = new(FileInput)
    i.data = make(chan []byte, 1000)
    i.exit = make(chan bool)
    i.path = path
    i.speedFactor = 1
    i.loop = loop
    i.readDepth = readDepth
    i.stats = expvar.NewMap("file-" + path)
    i.dryRun = dryRun
    i.maxWait = maxWait
    if err := i.init(); err != nil {
        return
    }
    go i.emit()
    return
}

做文件初始化

func (i *FileInput) init() (err error) {
    defer i.mu.Unlock()
    i.mu.Lock()

    var matches []string

    if strings.HasPrefix(i.path, "s3://") {
        sess := session.Must(session.NewSession(awsConfig()))
        svc := s3.New(sess)

        bucket, key := parseS3Url(i.path)

        params := &s3.ListObjectsInput{
            Bucket: aws.String(bucket),
            Prefix: aws.String(key),
        }

        resp, err := svc.ListObjects(params)
        if err != nil {
            Debug(0, "[INPUT-FILE] Error while retreiving list of files from S3", i.path, err)
            return err
        }

        for _, c := range resp.Contents {
            matches = append(matches, "s3://"+bucket+"/"+(*c.Key))
        }
    } else if matches, err = filepath.Glob(i.path); err != nil {
        Debug(0, "[INPUT-FILE] Wrong file pattern", i.path, err)
        return
    }

    if len(matches) == 0 {
        Debug(0, "[INPUT-FILE] No files match pattern: ", i.path)
        return errors.New("No matching files")
    }

    i.readers = make([]*fileInputReader, len(matches))

    for idx, p := range matches {
        i.readers[idx] = newFileInputReader(p, i.readDepth)
    }

    i.stats.Add("reader_count", int64(len(matches)))

    return nil
}

内容读取

func (i *FileInput) emit() {
    var lastTime int64 = -1

    var maxWait, firstWait, minWait int64
    minWait = math.MaxInt64

    i.stats.Add("negative_wait", 0)

    for {
        select {
        case <-i.exit:
            return
        default:
        }

        reader := i.nextReader()

        if reader == nil {
            if i.loop {
                i.init()
                lastTime = -1
                continue
            } else {
                break
            }
        }

        reader.queue.RLock()
        payload := heap.Pop(&reader.queue).(*filePayload)
        i.stats.Add("total_counter", 1)
        i.stats.Add("total_bytes", int64(len(payload.data)))
        reader.queue.RUnlock()

        if lastTime != -1 {
            diff := payload.timestamp - lastTime

            if firstWait == 0 {
                firstWait = diff
            }

            if i.speedFactor != 1 {
                diff = int64(float64(diff) / i.speedFactor)
            }

            if i.maxWait > 0 && diff > int64(i.maxWait) {
                diff = int64(i.maxWait)
            }

            if diff >= 0 {
                lastTime = payload.timestamp

                if !i.dryRun {
                    time.Sleep(time.Duration(diff))
                }

                i.stats.Add("total_wait", diff)

                if diff > maxWait {
                    maxWait = diff
                }

                if diff < minWait {
                    minWait = diff
                }
            } else {
                i.stats.Add("negative_wait", 1)
            }
        } else {
            lastTime = payload.timestamp
        }

        // Recheck if we have exited since last check.
        select {
        case <-i.exit:
            return
        default:
            if !i.dryRun {
                i.data <- payload.data
            }
        }
    }

    i.stats.Set("first_wait", time.Duration(firstWait))
    i.stats.Set("max_wait", time.Duration(maxWait))
    i.stats.Set("min_wait", time.Duration(minWait))

    Debug(0, fmt.Sprintf("[INPUT-FILE] FileInput: end of file ‘%s‘\n", i.path))

    if i.dryRun {
        fmt.Printf("Records found: %v\nFiles processed: %v\nBytes processed: %v\nMax wait: %v\nMin wait: %v\nFirst wait: %v\nIt will take `%v` to replay at current speed.\nFound %v records with out of order timestamp\n",
            i.stats.Get("total_counter"),
            i.stats.Get("reader_count"),
            i.stats.Get("total_bytes"),
            i.stats.Get("max_wait"),
            i.stats.Get("min_wait"),
            i.stats.Get("first_wait"),
            time.Duration(i.stats.Get("total_wait").(*expvar.Int).Value()),
            i.stats.Get("negative_wait"),
        )
    }
}

 

goreplay~v1.3.0新增项

上一篇:4.1二维曲线


下一篇:27:RNN循环神经网络-正弦曲线波形的预测案列