【C#】从RabbitMQ的消费者事件窥.NET标准事件

rabbitMQ中,官方文档中,接收消息最方便且推荐的方法:使用IBasicConsumer消费者接口设置订阅messages到达队列后将自动发送,只要订阅了Received事件,就可以从中接收到队列消息,而不必主动请求。实现这种消费者(发布订阅)模式 ,.NET/C# Client API是通过C#事件。事件的本质就是多播委托。

1.RabbitMQ中的事件

首先我们来看一下在RabbitMQ的使用方式:

1.1 订阅事件

订阅消费者的received事件

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                {
                    var body = ea.Body.ToArray();
                    // copy or deserialise the payload
                    // and process the message
                    // ...
                    channel.BasicAck(ea.DeliveryTag, false);
                };
// this consumer tag identifies the subscription
// when it has to be cancelled
String consumerTag = channel.BasicConsume(queueName, false, consumer);

1.2 定义事件

再来看一下源码,省略部分源码

namespace RabbitMQ.Client.Events
{
    ///<summary>Experimental class exposing an IBasicConsumer‘s
    ///methods as separate events.</summary>
    public class EventingBasicConsumer : DefaultBasicConsumer
    {
        public EventingBasicConsumer(IModel model) : base(model)
        {
        }

        public event EventHandler<BasicDeliverEventArgs> Received;

        ///<summary>
        /// Invoked when a delivery arrives for the consumer.
        /// </summary>
        /// <remarks>
        /// Handlers must copy or fully use delivery body before returning.
        /// Accessing the body at a later point is unsafe as its memory can
        /// be already released.
        /// </remarks>
        public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
        {
            base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
            Received?.Invoke(
                this,
                new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
        }
    }
}

1.3 事件传递信息

其中received为事件,当调用HandleBasicDeliver方法便会触发事件,这也是事件特定,只能在类的内部调用,并传递事件源,及事件传递信息BasicDeliverEventArgs类,如下

    public class BasicDeliverEventArgs : EventArgs
    {
        ///<summary>Default constructor.</summary>
        public BasicDeliverEventArgs()
        {
        }

        ///<summary>Constructor that fills the event‘s properties from
        ///its arguments.</summary>
        public BasicDeliverEventArgs(string consumerTag,
            ulong deliveryTag,
            bool redelivered,
            string exchange,
            string routingKey,
            IBasicProperties properties,
            ReadOnlyMemory<byte> body)
        {
            ConsumerTag = consumerTag;
            DeliveryTag = deliveryTag;
            Redelivered = redelivered;
            Exchange = exchange;
            RoutingKey = routingKey;
            BasicProperties = properties;
            Body = body;
        }

        ///<summary>The content header of the message.</summary>
        public IBasicProperties BasicProperties { get; set; }

        ///<summary>The message body.</summary>
        public ReadOnlyMemory<byte> Body { get; set; }

        ///<summary>The consumer tag of the consumer that the message
        ///was delivered to.</summary>
        public string ConsumerTag { get; set; }

        ///<summary>The delivery tag for this delivery. See
        ///IModel.BasicAck.</summary>
        public ulong DeliveryTag { get; set; }

        ///<summary>The exchange the message was originally published
        ///to.</summary>
        public string Exchange { get; set; }

        ///<summary>The AMQP "redelivered" flag.</summary>
        public bool Redelivered { get; set; }

        ///<summary>The routing key used when the message was
        ///originally published.</summary>
        public string RoutingKey { get; set; }
    }

2.标准的.NET事件模式

接下来我们看一下标准的.NET事件模式

2.1 定义事件传递信息类EventArgs

public class PriceChangeEventArgs:System.EventArgs
{
    public readonly decimal LastPrice;
    public readonly decimal NewPrice;
    public PriceChangeEventArgs(decimal lastPrice,decimal newPrice)
    {
        LastPrice=lastPrice;
        NewPrice=newPrice;
    }
}
  • System.EventArgs是.net framework中预定义的类,除了静态的Empty属性之外,没有其他成员
    • EventArgs为事件传递信息类的基类
    • 继承这个基类来自定义事件传递信息类

2.2* 为事件定义委托EventHandler

public delegate void PriceChangedEventHandler<TEventArgs>(object source,TEventArgs e) where TEventArgs:EventArgs;

事件可以说是依赖委托的,实质本来也就是一种类型安全的委托,要想定义事件,必定指定委托,对于能够定义事件的委托,有如下要求:

  • 返回类型是void
  • 接收两个参数,第一个参数类型是object,第二个参数类型是EventArgs的子类。
    • 第一个参数表示事件的广播者(触发事件的对象)
    • 第二个参数包含事件需要传递的信息
  • 名称必须以EventHandler结尾

在最新的.NET Core事件模式下,为了事件传递参数更更加的灵活,已经不再要求 TEventArgs 必须是派生自 System.EventArgs 的类,上面的代码就可以不再继承System.EventArgs

public delegate void PriceChangedEventHandler<TEventArgs>(object source,TEventArgs e);

然后上面的信息传递类也是可以改造的。

public class PriceChangeEventArgs
{
    public readonly decimal LastPrice;
    public readonly decimal NewPrice;
    public PriceChangeEventArgs(decimal lastPrice, decimal newPrice)
    {
        LastPrice = lastPrice;
        NewPrice = newPrice;
    }
}

其实.net已经为我们定义了一个泛型委托System.EventHandler与非泛型委托EventHandler

public delegate void EventHandler<TEventArgs>(object source,TEventArgs e);
public delegate void EventHandler(object? sender, EventArgs e);

这里使用自定义的委托,还是.NET预定义委托,都是可以的。如果我们使用.NE预定义委托,本小节就可以省略,可有可无。

建议使用预定义委托,毕竟已经有现成的

2.3 使用委托定义事件event

2.3.1 针对自定义委托

public event PriceChangedEventHandler<PriceChangeEventArgs> PriceChanged;

2.3.2 针对预定义的泛型委托

public event EventHandler<PriceChangeEventArgs> PriceChanged;

2.3.3 针对预定义的非泛型委托

public event EventHandler PriceChanged;

2.4 触发事件的方法On

protected virtual void OnPriceChanged(PriceChangeEventArgs e)
{
    PriceChanged?.Invoke(this,e);
}
  • 标准模式下要求方法名必须和事件一致,前面再加上On,接收一个EventArgs参数
    • rabbitMQ的源码中,并没有遵从这种标准,而是使用的Handle前缀。如HandleBasicDeliver,大家灵活使用。
  • 这里记住 PriceChanged?.Invoke(this,e);事件名?.Invoke(this,事件传递参数),等价下面代码。
if(PriceChanged!=null)
{
    PriceChanged(this,e);
}

2.6 触发事件

定义了触发事件的方法,还需要定义触发事件的条件,事件的本质是类型安全的委托,实质也是封装了一个多播委托,只是功能上比委托有了跟个多限制,只能在定义事件的类的内部直接调用事件。

public decimal Price
{
    get { return price; }
    set
    {
        if (price == value) return;
        decimal oldPrice = price;
        price = value;
        OnPriceChanged(new PriceChangeEventArgs(oldPrice, price));
    }
}

上述示例中,在属性的set访问器,中触发事件:当价格发生变化时,将触发股价变化事件(PriceChanged)。

2.5 事件的使用(订阅事件)

没有订阅的事件,永远不会触发(因为为null)。所以我们需要订阅事件。

//  一个股票类
//  股票的价格变化订阅事件
static void Main(string[] args)
{
    Stock st = new Stock("股票");
    st.Price = 100;
    st.PriceChanged += stock_PriceChanged;
    st.Price = 200;
    Console.WriteLine("Hello World!");
}
static void stock_PriceChanged(object sender, PriceChangeEventArgs e)
{
    if ((e.NewPrice - e.LastPrice) / e.LastPrice > 0.1M)
    {
        Console.WriteLine("Alert,10% stock price increase");
    }
}

2.5.1 使用匿名方法订阅事件

st.PriceChanged += delegate(object o, PriceChangeEventArgs e)  
{  
    var lastPrice = e.LastPrice;
    var newPrice = e.NewPrice;
    //...
};  

2.5.2 使用lambda表达式订阅事件

使用+=操作符订阅事件,更多的实际操作是使用lambda表达式,用于接收事件源参数和事件传递的信息:

st.PriceChanged += (sender, e) =>
{
    var lastPrice = e.LastPrice;
    var newPrice = e.NewPrice;
    //...
};

3.总结

C#通过事件机制实现线程间(进程内)的通信。让我想起了MediatR这个库。

  • 事件的定义是在一个类中
  • 事件的注册是在另一个类中
  • 在定义事件的类中触发
  • 传递参数至注册类中使用。

3.1 什么情况下使用事件?

当我们学习到某种方法总会有疑问,到底什么时候使用事件,事件能够办到的,看起来委托也能办到。

当事件源将在很长一段时间内触发事件,基于事件的设计就显得非常自然,例如RabbitMQ的消费者Recived事件,一旦订阅了事件,在当前程序的整个生命周期,事件源随时都可以触发事件。在CS程序中,UI控件设计示例基本也是基于各种事件。

3.2 事件定义与使用

  • 信息传递类:public class xxxEventArgs{}

    • 可继承EventArgs,也可以自定义
  • *委托:public delegate void EventHandler<xxxEventArgs>(object source,xxxEventArgs e)

    • 使用.NET预定义类(这一步可以省略)
  • 事件:public event EventHandler<xxxEventArgs> EventName;

  • 触发事件的方法:

    protected virtual void OnEventName(xxxEventArgs e)
    {
      	eventName?.Invoke(this,e);
    }
    
  • 触发事件:在特定的需求下,执行OnEventName(new xxxEventArgs(){})

  • 订阅(注册)事件:xx.EventName+=(src,e)=>{}

参考链接

https://www.bilibili.com/video/BV1Ht41137R1

https://docs.microsoft.com/zh-cn/dotnet/csharp/event-pattern

【C#】从RabbitMQ的消费者事件窥.NET标准事件

上一篇:c#学习01


下一篇:Delphi 获取选中的行或列,选中到某个行数