go rabbitmq延时队列

参考了:https://www.cnblogs.com/mfrank/p/11260355.html#autoid-0-8-0

demo:

package new_rabbitmq

import (
	"fmt"
	"github.com/streadway/amqp"
	"time"
)

var (
	dm  *RabbitMQ
	dl  *RabbitMQ
	err error
	url = "amqp://guest:guest@localhost:5672/?heartbeat=5"
)

// 获取rabbitmq连接
type RabbitMQ struct {
	conn    *amqp.Connection
	channel *amqp.Channel
	//队列名称
	QueueName string
	//交换机
	Exchange string
	//key
	key string
	//连接信息
	Url string
}

//创建RabbitMQ结构体实例
func NewRabbitMQ(url, queueName, exchange, key string) (*RabbitMQ, error) {
	rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, key: key, Url: url}
	var err error
	//创建RabbitMQ连接
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Url)
	//rabbitmq.failOnErr(err, "创建连接错误!")
	if err != nil {
		return nil, err
	}

	rabbitmq.channel, err = rabbitmq.conn.Channel()
	if err != nil {
		return nil, err
	}
	return rabbitmq, nil
}
// new一个延迟队列 绑定交换机和queue
func NewDelayMq() *RabbitMQ {

	dm, err = NewRabbitMQ(url, "delay_queue", "delay_exchange", "delay_key")
	if err != nil {
		fmt.Println("connect err:" + err.Error())
	}
	dm.SetExchange()
	dm.SetAndBindQueue(map[string]interface{}{
		"x-message-ttl":             10000, // 10s超时
		"x-dead-letter-exchange":    "dead_letter_exchange",
		"x-dead-letter-routing-key": "dead_letter_key",
	})
	return dm
}
// new死信队列 绑定交换机和queue
func NewDeadLetterMq() *RabbitMQ {
	dl, err = NewRabbitMQ(url, "dead_letter_queue", "dead_letter_exchange", "dead_letter_key")
	if err != nil {
		fmt.Println("connect err:" + err.Error())
	}
	dl.SetExchange()
	dl.SetAndBindQueue(nil)

	return dl
}
// 延时队列推消息
func DelayPublish() {
	dm.DelayPublish("test delay1")
	dm.DelayPublish("test delay2")
}
// 死信消费
func DeadLetterConsume() {
	dl.DeadLetterConsume()
}

func (r *RabbitMQ) SetExchange() {
	err := r.channel.ExchangeDeclare(r.Exchange, "direct",
		true,
		false,
		false,
		false,
		nil)

	if err != nil {
		fmt.Println("exchange declare err" + err.Error())
	}
}

func (r *RabbitMQ) SetAndBindQueue(args amqp.Table) {
	_, err := r.channel.QueueDeclare(r.QueueName, true,
		false,
		false,
		false,
		args)

	if err != nil {
		fmt.Println("queue declare err" + err.Error())
	}

	err = r.channel.QueueBind(r.QueueName, r.key, r.Exchange, false, nil)
	if err != nil {
		fmt.Println("queue bind err" + err.Error())
	}
}

func (r *RabbitMQ) DelayPublish(msgStr string) {
	msg := amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		Timestamp:    time.Now(),
		ContentType:  "text/plain",
		Body:         []byte(msgStr),
	}
	err := r.channel.Publish(r.Exchange, r.key, false, false, msg)
	if err != nil {
		fmt.Println("DelayPublish err" + err.Error())
	}
	fmt.Println("delay queue push msg: " + msgStr)
}

func (r *RabbitMQ) DeadLetterConsume() {
	_ = r.channel.Qos(1, 0, false)
	msgChan, err := r.channel.Consume(r.QueueName, r.key,
		false,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		fmt.Println("dead consume err:" + err.Error())
	}

	for delivery := range msgChan {
		_ = delivery.Ack(true)
		msg := string(delivery.Body)
		fmt.Println("dead letter get msg:" + msg)
	}
}

  test:

package new_rabbitmq

import "testing"

func TestDelay(t *testing.T) {
	NewDelayMq()
	NewDeadLetterMq()

	DelayPublish()
	DeadLetterConsume()
}

  

上一篇:设计模式之工厂模式


下一篇:上周比赛总结2021-11-14