前言:
提到消息队列,首先想到:rabbit、kafka、redis/cordis、zeromq这种分布式的消息队列,基于内存缓存和服务发现算法,跨节点的这种消息发布订阅机制。
有时候的需求可能比较简单,需要一个可以不溢出的本地消息队列,diskqueue就是这种设计目标,基于文件的消息队列。
源码目录:
total 2630
drwxr-xr-x 1 Administrator 197121 0 4月 8 16:31 diskqueue/
-rw-r--r-- 1 Administrator 197121 59 4月 7 17:42 go.mod
-rw-r--r-- 1 Administrator 197121 523 4月 7 17:42 go.sum
-rw-r--r-- 1 Administrator 197121 2264 4月 8 16:17 main.go
-rwxr-xr-x 1 Administrator 197121 2683392 4月 8 16:28 test.exe*
drwxr-xr-x 1 Administrator 197121 0 4月 8 16:31 tmp/
测试源码
```go
// main.go
package main
import (
"fmt"
"strconv"
"sync"
"time"
disk "test/diskqueue"
"github.com/minio/cli"
)
func DqueConsumer(ctx *cli.Context) {
syncTicker := time.NewTicker(time.Second * 10000000)
/*
dq := disk.New("deque", "/opt/dque", 1024, 4, 1<<10, 2500, 2*time.Second,
func(lvl disk.LogLevel, f string, args ...interface{}) {
//fmt.Println(fmt.Sprintf(lvl.String()+": "+f, args...))
})
*/
dqName := "test"
tmpDir := "tmp"
dq := disk.New(dqName, tmpDir, 10, 4, 1<<10, 2500, 2*time.Second, 1*time.Second,
func(lvl disk.LogLevel, f string, args ...interface{}) {
fmt.Println(fmt.Sprintf(lvl.String()+": "+f, args...))
})
go func() {
for {
select {
case ms := <-dq.ReadChan():
fmt.Println(">>>>>>>>>>>>>>>>> " + string(ms))
if ms == nil {
dq.Close()
dq = disk.New(dqName, tmpDir, 10, 4, 1<<10, 2500, 2*time.Second, 1*time.Second,
func(lvl disk.LogLevel, f string, args ...interface{}) {
fmt.Println(fmt.Sprintf(lvl.String()+": "+f, args...))
})
time.Sleep(time.Millisecond * 1000)
//fmt.Println("read over !!!! exit!!!!!!!!!!!!!!!!!!")
//defer wg.Done()
//break
}
case <-syncTicker.C:
dq.Close()
time.Sleep(time.Millisecond * 2000)
dq = disk.New(dqName, tmpDir, 10, 4, 1<<10, 2500, 5*time.Second, 1*time.Second,
func(lvl disk.LogLevel, f string, args ...interface{}) {
fmt.Println(fmt.Sprintf(lvl.String()+": "+f, args...))
})
}
}
}()
}
var wg sync.WaitGroup
func main() {
dqName := "test"
tmpDir := "tmp"
dq := disk.New(dqName, tmpDir, 10, 4, 1<<10, 2500, 2*time.Second, 1*time.Second,
func(lvl disk.LogLevel, f string, args ...interface{}) {
fmt.Println(fmt.Sprintf(lvl.String()+": "+f, args...))
})
go func() {
wg.Add(1)
for i := 0; i < 1000; i++ {
dq.Put([]byte("hello worker," + strconv.Itoa(i) + "\n"))
time.Sleep(time.Millisecond * 10)
}
defer wg.Done()
}()
time.Sleep(time.Millisecond * 1000)
//cnt := 0
///*
go func() {
wg.Add(1)
cnt := 0
for {
select {
case ms := <-dq.ReadChan():
cnt++
fmt.Println("<<<<<<<<<<<<<<<<<<<<<< " + string(ms))
}
}
defer wg.Done()
}()
//*/
//wg.Add(1)
//go DqueConsumer(nil)
time.Sleep(time.Millisecond * 1000)
wg.Wait()
fmt.Print("end")
}
// diskqueue.go
package diskqueue
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"math/rand"
"os"
"path"
"sync"
"sync/atomic"
"time"
)
// logging stuff copied from github.com/nsqio/nsq/internal/lg
type LogLevel int
const (
DEBUG = LogLevel(1)
INFO = LogLevel(2)
WARN = LogLevel(3)
ERROR = LogLevel(4)
FATAL = LogLevel(5)
)
type AppLogFunc func(lvl LogLevel, f string, args ...interface{})
func (l LogLevel) String() string {
switch l {
case 1:
return "DEBUG"
case 2:
return "INFO"
case 3:
return "WARNING"
case 4:
return "ERROR"
case 5:
return "FATAL"
}
panic("invalid LogLevel")
}
type Interface interface {
Put([]byte) error
ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
Close() error
Delete() error
Depth() int64
Empty() error
}
// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
// run-time state (also persisted to disk) 运行状态(会持久化到磁盘文件)
readPos int64
writePos int64
readFileNum int64
writeFileNum int64
depth int64 // 存在消息条数。写一条加1,读一条减1
sync.RWMutex
// instantiation time metadata
name string // 消息文件名前缀
dataPath string // 消息文件目录
maxBytesPerFile int64 // currently this cannot change once created. 每个文件的最大字节数
minMsgSize int32
maxMsgSize int32
syncEvery int64 // number of writes per fsync. syncEvery条消息持久化
syncTimeout time.Duration // duration of time per fsync. syncTimeout 超时(单位是秒),消息持久化
readTimeout time.Duration // duration of time per fsync. syncTimeout 超时(单位是秒),消息持久化
exitFlag int32
needSync bool
// keeps track of the position where we have read
// (but not yet sent over readChan)
nextReadPos int64
nextReadFileNum int64
readFile *os.File
writeFile *os.File
reader *bufio.Reader
writeBuf bytes.Buffer
// exposed via ReadChan()
readChan chan []byte
// internal channels
writeChan chan []byte
writeResponseChan chan error
emptyChan chan int
emptyResponseChan chan error
exitChan chan int
exitSyncChan chan int
logf AppLogFunc // 日志回调函数
}
// New instantiates an instance of diskQueue, retrieving metadata
// from the filesystem and starting the read ahead goroutine
func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration, readTimeout time.Duration, logf AppLogFunc) Interface {
d := diskQueue{
name: name,
dataPath: dataPath,
maxBytesPerFile: maxBytesPerFile,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
emptyChan: make(chan int),
emptyResponseChan: make(chan error),
exitChan: make(chan int),
exitSyncChan: make(chan int),
syncEvery: syncEvery,
syncTimeout: syncTimeout,
readTimeout: readTimeout,
logf: logf,
}
// no need to lock here, nothing else could possibly be touching this instance. 加载消息队列元数据
err := d.retrieveMetaData()
if err != nil && !os.IsNotExist(err) {
d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
}
go d.ioLoop() // 开启main loop go routine,select chan,执行任务
return &d // 返回消息队列指针
}
// Depth returns the depth of the queue. 消息队列深度(消息条数)
func (d *diskQueue) Depth() int64 {
return atomic.LoadInt64(&d.depth)
}
// ReadChan returns the receive-only []byte channel for reading data
func (d *diskQueue) ReadChan() <-chan []byte {
return d.readChan
}
// Put writes a []byte to the queue. 向写通道写入数据,main loop会读取数据写入消息文件
func (d *diskQueue) Put(data []byte) error {
d.RLock()
defer d.RUnlock()
if d.exitFlag == 1 {
return errors.New("exiting")
}
d.writeChan <- data
return <-d.writeResponseChan
}
// Close cleans up the queue and persists metadata. 关系消息读文件、写文件。持久化d到元数据文件
func (d *diskQueue) Close() error {
err := d.exit(false)
if err != nil {
return err
}
return d.sync()
}
func (d *diskQueue) Delete() error {
return d.exit(true)
}
func (d *diskQueue) exit(deleted bool) error {
d.Lock()
defer d.Unlock()
d.exitFlag = 1
if deleted {
d.logf(INFO, "DISKQUEUE(%s): deleting", d.name)
} else {
d.logf(INFO, "DISKQUEUE(%s): closing", d.name)
}
// 关闭退出通道,通知退出前同步文件
close(d.exitChan)
// ensure that ioLoop has exited
<-d.exitSyncChan
// 关闭读写文件
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
}
if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
return nil
}
/*
Empty destructively clears out any pending data in the queue by fast forwarding read positions and removing intermediate files
通过快速转发读取位置并删除中间文件,Empty破坏性地清除队列中的所有未决数据
*/
func (d *diskQueue) Empty() error {
d.RLock()
defer d.RUnlock()
if d.exitFlag == 1 {
return errors.New("exiting")
}
d.logf(INFO, "DISKQUEUE(%s): emptying", d.name)
d.emptyChan <- 1
return <-d.emptyResponseChan
}
// 删除全部消息文件、更新d对象状态、删除元数据文件
func (d *diskQueue) deleteAllFiles() error {
err := d.skipToNextRWFile()
innerErr := os.Remove(d.metaDataFileName())
if innerErr != nil && !os.IsNotExist(innerErr) {
d.logf(ERROR, "DISKQUEUE(%s) failed to remove metadata file - %s", d.name, innerErr)
return innerErr
}
return err
}
// 删除全部消息文件、更新d对象的状态
func (d *diskQueue) skipToNextRWFile() error {
var err error
// 关闭读、写文件
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
}
if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
// 删除全部消息文件
for i := d.readFileNum; i <= d.writeFileNum; i++ {
fn := d.fileName(i)
innerErr := os.Remove(fn)
if innerErr != nil && !os.IsNotExist(innerErr) {
d.logf(ERROR, "DISKQUEUE(%s) failed to remove data file - %s", d.name, innerErr)
err = innerErr
}
}
// 更新元数据状态
d.writeFileNum++
d.writePos = 0
d.readFileNum = d.writeFileNum
d.readPos = 0
d.nextReadFileNum = d.writeFileNum
d.nextReadPos = 0
atomic.StoreInt64(&d.depth, 0)
return err
}
// readOne performs a low level filesystem read for a single []byte
// while advancing read positions and rolling files, if necessary
func (d *diskQueue) readOne() ([]byte, error) {
var err error
var msgSize int32
if d.readFile == nil {
curFileName := d.fileName(d.readFileNum)
d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
if err != nil {
return nil, err
}
d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s, time is:%+v", d.name, curFileName, time.Now())
if d.readPos > 0 {
_, err = d.readFile.Seek(d.readPos, 0)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}
}
d.reader = bufio.NewReader(d.readFile)
}
err = binary.Read(d.reader, binary.BigEndian, &msgSize)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}
if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
// this file is corrupt and we have no reasonable guarantee on
// where a new message should begin
d.readFile.Close()
d.readFile = nil
return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
}
readBuf := make([]byte, msgSize)
_, err = io.ReadFull(d.reader, readBuf)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}
totalBytes := int64(4 + msgSize)
// we only advance next* because we have not yet sent this to consumers
// (where readFileNum, readPos will actually be advanced)
d.nextReadPos = d.readPos + totalBytes
d.nextReadFileNum = d.readFileNum
// TODO: each data file should embed the maxBytesPerFile
// as the first 8 bytes (at creation time) ensuring that
// the value can change without affecting runtime
if d.nextReadPos > d.maxBytesPerFile {
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
}
d.nextReadFileNum++
d.nextReadPos = 0
}
return readBuf, nil
}
// writeOne performs a low level filesystem write for a single []byte
// while advancing write positions and rolling files, if necessary
func (d *diskQueue) writeOne(data []byte) error {
var err error
if d.writeFile == nil {
curFileName := d.fileName(d.writeFileNum)
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
if d.writePos > 0 {
_, err = d.writeFile.Seek(d.writePos, 0)
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
}
}
dataLen := int32(len(data))
if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
}
d.writeBuf.Reset()
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
if err != nil {
return err
}
_, err = d.writeBuf.Write(data)
if err != nil {
return err
}
// only write to the file once
_, err = d.writeFile.Write(d.writeBuf.Bytes())
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
totalBytes := int64(4 + dataLen)
d.writePos += totalBytes
atomic.AddInt64(&d.depth, 1)
if d.writePos >= d.maxBytesPerFile {
d.writeFileNum++
d.writePos = 0
// sync every time we start writing to a new file
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
}
return err
}
// sync fsyncs the current writeFile and persists metadata. 同步当前的写文件,同步d到元数据文件
func (d *diskQueue) sync() error {
if d.writeFile != nil {
err := d.writeFile.Sync()
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
}
err := d.persistMetaData()
if err != nil {
return err
}
d.needSync = false
return nil
}
// retrieveMetaData initializes state from the filesystem. 从文件加载消息队列元数据到d指向的变量
func (d *diskQueue) retrieveMetaData() error {
var f *os.File
var err error
fileName := d.metaDataFileName()
f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
if err != nil {
return err
}
defer f.Close()
var depth int64
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&depth,
&d.readFileNum, &d.readPos,
&d.writeFileNum, &d.writePos)
if err != nil {
return err
}
atomic.StoreInt64(&d.depth, depth)
d.nextReadFileNum = d.readFileNum
d.nextReadPos = d.readPos
return nil
}
// persistMetaData atomically writes state to the filesystem. 将d指向的对象写入元数据文件,如果元数据文件不存在则新创建一个
func (d *diskQueue) persistMetaData() error {
var f *os.File
var err error
fileName := d.metaDataFileName()
tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
// write to tmp file
f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
atomic.LoadInt64(&d.depth),
d.readFileNum, d.readPos,
d.writeFileNum, d.writePos)
if err != nil {
f.Close()
return err
}
f.Sync()
f.Close()
// atomically rename
return os.Rename(tmpFileName, fileName)
}
// 返回拼接的元数据文件名
func (d *diskQueue) metaDataFileName() string {
return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.meta.dat"), d.name)
}
// 返回消息文件的文件名
func (d *diskQueue) fileName(fileNum int64) string {
return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum)
}
func (d *diskQueue) checkTailCorruption(depth int64) {
// 如果还有消息文件没有读完,则立即返回
if d.readFileNum < d.writeFileNum || d.readPos < d.writePos {
return
}
// 消息已经读完,清理全部的消息文件 now !!!
// we've reached the end of the diskqueue
// if depth isn't 0 something went wrong
if depth != 0 {
if depth < 0 {
d.logf(ERROR,
"DISKQUEUE(%s) negative depth at tail (%d), metadata corruption, resetting 0...",
d.name, depth)
} else if depth > 0 {
d.logf(ERROR,
"DISKQUEUE(%s) positive depth at tail (%d), data loss, resetting 0...",
d.name, depth)
}
// force set depth 0. 强制将未读消息条数写为0条
atomic.StoreInt64(&d.depth, 0)
d.needSync = true
}
if d.readFileNum != d.writeFileNum || d.readPos != d.writePos {
if d.readFileNum > d.writeFileNum {
d.logf(ERROR,
"DISKQUEUE(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
d.name, d.readFileNum, d.writeFileNum)
}
if d.readPos > d.writePos {
d.logf(ERROR,
"DISKQUEUE(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
d.name, d.readPos, d.writePos)
}
d.skipToNextRWFile()
d.needSync = true
}
}
// 删除读取的上一条消息文件(如果光标进入了下一个文件)
func (d *diskQueue) moveForward() {
oldReadFileNum := d.readFileNum
d.readFileNum = d.nextReadFileNum
d.readPos = d.nextReadPos
depth := atomic.AddInt64(&d.depth, -1)
// see if we need to clean up the old file
if oldReadFileNum != d.nextReadFileNum {
// sync every time we start reading from a new file
d.needSync = true
fn := d.fileName(oldReadFileNum)
err := os.Remove(fn)
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err)
}
}
d.checkTailCorruption(depth)
}
// 如果文件读取错误,则将文件重命名(加.bad后缀),读文件号光标+1、设置need同步flag
func (d *diskQueue) handleReadError() {
// jump to the next read file and rename the current (bad) file
if d.readFileNum == d.writeFileNum {
// if you can't properly read from the current write file it's safe to
// assume that something is fucked and we should skip the current file too
if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
d.writeFileNum++
d.writePos = 0
}
badFn := d.fileName(d.readFileNum)
badRenameFn := badFn + ".bad"
d.logf(WARN,
"DISKQUEUE(%s) jump to next file and saving bad file as %s",
d.name, badRenameFn)
err := os.Rename(badFn, badRenameFn)
if err != nil {
d.logf(ERROR,
"DISKQUEUE(%s) failed to rename bad diskqueue file %s to %s",
d.name, badFn, badRenameFn)
}
d.readFileNum++
d.readPos = 0
d.nextReadFileNum = d.readFileNum
d.nextReadPos = 0
// significant state change, schedule a sync on the next iteration
d.needSync = true
}
// ioLoop provides the backend for exposing a go channel (via ReadChan())
// in support of multiple concurrent queue consumers
//
// it works by looping and branching based on whether or not the queue has data
// to read and blocking until data is either read or written over the appropriate
// go channels
//
// conveniently this also means that we're asynchronously reading from the filesystem
func (d *diskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64
var r chan []byte
syncTicker := time.NewTicker(d.syncTimeout)
readTicker := time.NewTicker(d.readTimeout)
for {
// dont sync all the time :) 积累到目标条数强制同步
if count == d.syncEvery {
d.needSync = true
}
if d.needSync {
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
count = 0
}
// 有未读的消息文件,或读位置小于写位置
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
d.name, d.readPos, d.fileName(d.readFileNum), err)
d.handleReadError()
continue
} else {
//d.readChan <- dataRead
//count++
// moveForward sets needSync flag if a file is removed
//d.moveForward()
fmt.Println("read message" + string(dataRead))
}
}
r = d.readChan // r指向读chan []byte
} else {
r = nil
}
select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case r <- dataRead:
//case d.readChan <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
d.moveForward()
case <-readTicker.C:
/*if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
d.name, d.readPos, d.fileName(d.readFileNum), err)
d.handleReadError()
continue
} else {
//d.readChan <- dataRead
//count++
// moveForward sets needSync flag if a file is removed
//d.moveForward()
fmt.Println("read message" + string(dataRead))
}
}
r = d.readChan // r指向读chan []byte
}*/
case <-d.emptyChan:
d.emptyResponseChan <- d.deleteAllFiles()
count = 0
case dataWrite := <-d.writeChan:
count++
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
if count == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
goto exit
}
}
exit:
d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
syncTicker.Stop()
d.exitSyncChan <- 1
}