这次来介绍一下WCF4.0新特性体验(9):非破坏性队列接收(Non-destructive queue receive )。这个特性不是那么直观。确切来说是WCF4.0对于以前处理MSMQ消息队列机制的一个改进。本期文章,我们会对WCF4.0里使用的新的消息队列处理模式做出详细的介绍。当然为了帮助大家更好地理解这些知识,这里也会介绍WCF3.X的MSMQ消息处理模式。其中包含一个重要的类型ReceiveContext的介绍。全文分为5个部分:【1】MSMQ消息队列,【2】破坏性队列接收模式,【3】非破坏性队列接收模式(Non-destructive queue receive ),【4】接收上下文环境ReceiveContext,【5】使用ReceiveContext处理MSMQ队列消息。
在学习本节内容之前,需要你对有一定的MSMQ消息队列、WCF操作MSMQ队列和WCF事务机制的基础知识。如果你还没有学习过相关的内容,推荐看先阅读一下下面三篇文章:
这里基本对本文需要涉及到的一些基础知识点做了详细的介绍。
【1】MSMQ消息队列
之前我们在WCF分布式开发必备知识(1):MSMQ消息队列 也做过介绍。MSMQ全称MicroSoft Message Queue,微软消息队列,是在多个不同的应用之间实现相互通信的一种异步传输模式,相互通信的应用可以分布于同一台机器上,也可以分布于相连的网络空间中的任一位置。它的实现原理是:消息的发送者把自己想要发送的信息放入一个容器中(我们称之为Message),然后把它保存至一个系统公用空间的消息队列(Message Queue)中;本地或者是异地的消息接收程序再从该队列中取出发给它的消息进行处理。
WCF4.0提供了对于异步编程模型和解耦系统的支持。我们知道消息队列可以提供一个异步通信的方法。消息可以保留在队列中。每个消息都有特定的生命周期。如果在指定的生命周期里未发送至目标队列,则转移到死信队列。
我们知道WCF中提供了NetMsmqBinding和MsmqIntegrationBinding绑定来支持MSMQ消息队列编程模型。它们使用个单向One-way消息交换模式MEP,也就是数据报消息交换模式。使用数据报 MEP 时,客户端使用“启动后不管”的交换形式发送消息。“启动后不管”交换形式是一种要求对成功传递做带外确认的交换形式(可以参考《WCF技术内幕》翻译33:第2部分_第6章_通道:通道形状 )。
当WCF终结点与用 C、C++、COM 或 System.Messaging API 编写的现有 MSMQ 应用程序通信的时候使用MsmqIntegrationBinding。而NetMsmqBinding提供了WCF终结点与MSMQ进行基本的通信功能。它们使用的是IInputChannel接口,也就是支持数据报消息交换模式,即我们所说的单向MEP。
由于消息队列MSMQ的优点:稳定、消息优先级、脱机能力以及安全性,有保障的消息传递和执行许多业务处理的可靠的防故障机制。因此使用MSMQ消息队列可以确保消息的正确性以及传递的可靠性。大多数系统在处理消息的时候都会引入事务机制来处理消息。
WCF在处理消息队列消息的时候,既可以使用事务模式,也可以使用非事务模式。而破坏性队列消息接收的问题也和消息处理模式有关系。
【2】破坏性队列接收模式:
【2.1】WCF 消息队列MSMQ通信框架:
WCF使用NetMsmqBinding来支持消息队列通信。当客户端调用服务时,客户端消息会被封装为MSMQ消息,发送懂到特定的消息队列。服务端宿主在运行转台下会,启动通道侦听器,来检测消息队列消息,如果发现对应的消息,会从队列里取出消息,使用分发器转发给对应的服务。具体的通信架构如图:
如果宿主离线,消息会被放入队列,等待下一次宿主联机时,在执行消息分发处理,给指定的WCF服务。
【2.2】破坏性队列接收模型:
在了解了WCF的消息队列通信框架以后,我们来看一下什么是破坏性队列消息处理模式。在WCF4.0以前,WCF事务里处理队列消息的机制如下:
- 启动一个事务,当然这个事物范围内,可以包含多个参与者。
- 等待消息队列里消息的到来。如果在特定的时间内没有等到消息,就终止事务。然后重复步骤1,启动新事物,循环读取消息。
- 如果接收到消息,就从消息队列里益处读取过的消息。
- 在事务范围内,做处理工作。当然也包含于其它事务参与者协调工作。如果所有的事物参与者都成功,则会提交事务。但如果其中一个失败,则会回滚事务。重新在MSMQ队列里回复消息。重复步骤1。
【3】非破坏性队列接收模式(Non-destructive queue receive ):
那么什么是非破坏性队列消息接受模式呢。区别在于对于队列消息接受的方式不同。我们来具体看一下:
- 等待MSMQ队列消息到达。
- 如果消息到达,先检查消息内容。同事Lock锁定消息,这样其它参与者无法看到消息。保证消息读取的排他性。这里与上一个模式的不同点在于,此时不会从队列里移除消息,而是变为比可见。但是必要的时候我们可以移除消息。
- 启动一个新的事物。
- 在事务内部做处理工作,当所有的事物参与者都成功的时候,就可以提交事务。事务成功的时候,从队列里删除消息。如果一个失败,我们可以终止事务,这样消息还Lock在队列里。没有删除。我们可以重复步骤3继续处理工作。
这样的好处是避免了消息的删除问题。也就是破坏性接受消息。
【4】接收上下文环境ReceiveContext:
那么WCF4.0如何实现非破坏性队列消息处理的呢?这里一个比较重要的类型就是ReceiveContext,接收上下文环境。它提供了一个自定义状态机机制,可以控制对队列消息的非破坏性读取操纵。但是需要继承这个类型来实现消息接受功能消息只有解锁以后才可以看到。ReceiveContext是一个抽象类,它的定义如下:
public abstract class ReceiveContext
{
public readonly static string Name = "ReceiveContext";
protected ReceiveContext();
public static bool TryGet(Message message, out ReceiveContext property);
public static bool TryGet(MessageProperties properties, out ReceiveContext property);
public ReceiveContextState State
{
get; protected set;
}
protected object ThisLock
{
get;
}
public virtual void Abandon(TimeSpan timeout);
public virtual IAsyncResult BeginAbandon(TimeSpan timeout, AsyncCallback callback, object state);
protected abstract void OnAbandon(TimeSpan timeout);
protected abstract void OnEndAbandon(IAsyncResult result);
protected abstract IAsyncResult OnBeginAbandon(TimeSpan timeout, AsyncCallback callback, object state);
public virtual void EndAbandon(IAsyncResult result);
public virtual void Complete(TimeSpan timeout);
public virtual IAsyncResult BeginComplete(TimeSpan timeout, AsyncCallback callback, object state);
protected abstract IAsyncResult OnBeginComplete(TimeSpan timeout, AsyncCallback callback, object state);
protected abstract void OnComplete(TimeSpan timeout);
protected abstract void OnEndComplete(IAsyncResult result);
public virtual void EndComplete(IAsyncResult result);
protected virtual void Fault();
protected virtual void OnFaulted();
}
public enum ReceiveContextState
{
Received,
Completing,
Completed,
Abandoning,
Abandoned,
Faulted
}
{
public readonly static string Name = "ReceiveContext";
protected ReceiveContext();
public static bool TryGet(Message message, out ReceiveContext property);
public static bool TryGet(MessageProperties properties, out ReceiveContext property);
public ReceiveContextState State
{
get; protected set;
}
protected object ThisLock
{
get;
}
public virtual void Abandon(TimeSpan timeout);
public virtual IAsyncResult BeginAbandon(TimeSpan timeout, AsyncCallback callback, object state);
protected abstract void OnAbandon(TimeSpan timeout);
protected abstract void OnEndAbandon(IAsyncResult result);
protected abstract IAsyncResult OnBeginAbandon(TimeSpan timeout, AsyncCallback callback, object state);
public virtual void EndAbandon(IAsyncResult result);
public virtual void Complete(TimeSpan timeout);
public virtual IAsyncResult BeginComplete(TimeSpan timeout, AsyncCallback callback, object state);
protected abstract IAsyncResult OnBeginComplete(TimeSpan timeout, AsyncCallback callback, object state);
protected abstract void OnComplete(TimeSpan timeout);
protected abstract void OnEndComplete(IAsyncResult result);
public virtual void EndComplete(IAsyncResult result);
protected virtual void Fault();
protected virtual void OnFaulted();
}
public enum ReceiveContextState
{
Received,
Completing,
Completed,
Abandoning,
Abandoned,
Faulted
}
接收上下文环境的功能是双向的。当使用队列通道的时候,接收上下文允许你在处理消息以前锁定lock消息。即使失败你也可以继续独占消息。其它服务不能够接受和处理这个消息。你可以决定是否做别的处理。当使用单向通道的时候,接收上下文环境允许服务控制何时发送成功确认消息或者失败确认消息。如果没有接收到确认消息或者接收到失败确认消息,客户端可以重新发送消息。
ReceiveContext里提供了两个方法可以在破坏性和费破坏性接受模式之间转换。Abandon会从破坏性接收模式转换到非破坏性接收模式,并解锁消息。它会立即终止当前对于消息的操作。 Complete 方法会从非破坏性模式转换到破坏性接收模式。如果当前存在事务。Complete 方法就会在事务范围内执行。
【5】使用ReceiveContext处理MSMQ队列消息:
下面我们就来介绍如何在WCF4.0里使用ReceiveContext处理MSMQ队列消息。在WCF 3.x, 我们可以通过NetMsmqBinding来使用事务性队列。当出现异常的时候,不能送达消息到目标服务的时候,WCF会把消息存储在队列里。
下面的例子演示了如何在WCF里使用ReceiveContext 去处理MSMQ队列消息:
【5.1】服务端:
这里我们要定义了一个单向操作契约。 设置操作行为属性我们启用事务。[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)],另外就是设置接受上下问环境为可以人工控制:
[ReceiveContextEnabled(ManualControl = true)]。具体代码如下:
[ReceiveContextEnabled(ManualControl = true)]。具体代码如下:
//1.服务契约
[ServiceContract(Namespace = "http://www.cnblogs.com/frank_xl/")]
public interface IWCFMSMQService
{
//操作契约,必须为单向操作
[OperationContract(IsOneWay = true)]
void SayHelloMSMQ(string name);
}
//2.服务类,继承接口。实现服务契约定义的操作
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single,ConcurrencyMode = ConcurrencyMode.Single)]
public class WCFMSMQService : IWCFMSMQService
{
public WCFMSMQService()
{
Console.WriteLine("WCF MSMQ Service instance was created at:{0}", DateTime.Now);
}
//实现接口定义的方法
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
[ReceiveContextEnabled(ManualControl = true)]
public void SayHelloMSMQ(string name)
{
ReceiveContext receiveContext;
if (!ReceiveContext.TryGet(OperationContext.Current.IncomingMessageProperties, out receiveContext))
{
Console.WriteLine("ReceiveContext not installed/found on this machine.");
return;
}
// Pseudo-randomly decide whether to process the message based on its content.
if (name != string.Empty)
{
receiveContext.Complete(TimeSpan.MaxValue);
//Complete 方法会从非破坏性模式转换到破坏性接收模式
Console.WriteLine("change to destructive queue receive ");
}
else//Abandon
{
receiveContext.Abandon(TimeSpan.MaxValue);
//Abandon会从破坏性接收模式转换到非破坏性接收模式
Console.WriteLine("change to Non-destructive queue receive ");
}
}
}
[ServiceContract(Namespace = "http://www.cnblogs.com/frank_xl/")]
public interface IWCFMSMQService
{
//操作契约,必须为单向操作
[OperationContract(IsOneWay = true)]
void SayHelloMSMQ(string name);
}
//2.服务类,继承接口。实现服务契约定义的操作
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single,ConcurrencyMode = ConcurrencyMode.Single)]
public class WCFMSMQService : IWCFMSMQService
{
public WCFMSMQService()
{
Console.WriteLine("WCF MSMQ Service instance was created at:{0}", DateTime.Now);
}
//实现接口定义的方法
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
[ReceiveContextEnabled(ManualControl = true)]
public void SayHelloMSMQ(string name)
{
ReceiveContext receiveContext;
if (!ReceiveContext.TryGet(OperationContext.Current.IncomingMessageProperties, out receiveContext))
{
Console.WriteLine("ReceiveContext not installed/found on this machine.");
return;
}
// Pseudo-randomly decide whether to process the message based on its content.
if (name != string.Empty)
{
receiveContext.Complete(TimeSpan.MaxValue);
//Complete 方法会从非破坏性模式转换到破坏性接收模式
Console.WriteLine("change to destructive queue receive ");
}
else//Abandon
{
receiveContext.Abandon(TimeSpan.MaxValue);
//Abandon会从破坏性接收模式转换到非破坏性接收模式
Console.WriteLine("change to Non-destructive queue receive ");
}
}
}
【5.2】宿主:
宿主通过netMsmqBinding托管WCF服务,并暴露一个终结点供客端调用。配置文件如下:
<service behaviorConfiguration="WCFService.WCFServiceBehavior" name="WCFService.WCFMSMQService">
<endpoint address="net.msmq://localhost/Private/FrankWCFMSMQ" binding="netMsmqBinding" contract="WCFService.IWCFMSMQService" bindingConfiguration="msmq">
</endpoint>
<endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange"/>
<host>
<baseAddresses>
<add baseAddress="http://localhost:8001/"/>
</baseAddresses>
</host>
</service>
</services>
<behaviors>
<serviceBehaviors>
<behavior name="WCFService.WCFServiceBehavior">
<serviceMetadata httpGetEnabled="true"/>
<serviceDebug includeExceptionDetailInFaults="false"/>
</behavior>
</serviceBehaviors>
</behaviors>
<endpoint address="net.msmq://localhost/Private/FrankWCFMSMQ" binding="netMsmqBinding" contract="WCFService.IWCFMSMQService" bindingConfiguration="msmq">
</endpoint>
<endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange"/>
<host>
<baseAddresses>
<add baseAddress="http://localhost:8001/"/>
</baseAddresses>
</host>
</service>
</services>
<behaviors>
<serviceBehaviors>
<behavior name="WCFService.WCFServiceBehavior">
<serviceMetadata httpGetEnabled="true"/>
<serviceDebug includeExceptionDetailInFaults="false"/>
</behavior>
</serviceBehaviors>
</behaviors>
【5.3】客户单:
客户端通过发送不同的name,来切换WCF服务的接受消息的模式,当消息为空的时候,调用Abandon方法,接受上下文环境会从破坏性接收模式转换到非破坏性接收模式;当传递的name不为空的时候,,执行Complete 方法接受上下文环境会从非破坏性模式转换到破坏性接收模式。代码如下:
//HTTP NetMsmqBinding_IWCFMSMQService
WCFMSMQServiceClient wcfServiceProxy = new WCFMSMQServiceClient("NetMsmqBinding_IWCFMSMQService");
//通过代理调用SayHello服务,这里及时服务调用服务失败,消息会发送到队列里进行缓存。
Console.WriteLine("WCF First Call at:{0}",DateTime.Now);
wcfServiceProxy.SayHelloMSMQ("");//Abandon会从破坏性接收模式转换到非破坏性接收模式
wcfServiceProxy.SayHelloMSMQ("Frank"); //Complete 方法会从非破坏性模式转换到破坏性接收模式
Console.WriteLine("Press any key to continue");
Console.Read();
WCFMSMQServiceClient wcfServiceProxy = new WCFMSMQServiceClient("NetMsmqBinding_IWCFMSMQService");
//通过代理调用SayHello服务,这里及时服务调用服务失败,消息会发送到队列里进行缓存。
Console.WriteLine("WCF First Call at:{0}",DateTime.Now);
wcfServiceProxy.SayHelloMSMQ("");//Abandon会从破坏性接收模式转换到非破坏性接收模式
wcfServiceProxy.SayHelloMSMQ("Frank"); //Complete 方法会从非破坏性模式转换到破坏性接收模式
Console.WriteLine("Press any key to continue");
Console.Read();
【6】总结:
WCF4.0提供的新特性:非破坏性队列接收(Non-destructive queue receive )是对原有队列消息处理模式的一种改进,它避免了破坏性消息接收模式对于消息队列的对象消息清楚和回复过程带来的巨大的性能为题,通过接受上下文环境提供的状态机机制来控制队列消息的接收模式。从而提高了队列消息处理的性能,并且带来了很大灵活性。我们也可以在事务范围内提供队列消息处理模式的具体控制。最后给出本文的参考代码:/Files/frank_xl/10.WCFServiceMSMQReceiveContext.rar。
参考文章:
8.在事务中对消息进行批处理
本文转自 frankxulei 51CTO博客,原文链接:http://blog.51cto.com/frankxulei/320270,如需转载请自行联系原作者