最近在园子里看到一篇关于TransactionScope的文章,发现事务和并发控制是新接触Entity Framework和Transaction Scope的园友们不易理解的问题,遂组织此文跟大家共同探讨。
首先事务的ACID特性作为最基础的知识我想大家都应该知道了。ADO.NET的SQLTransaction就是.NET框架下访问SqlServer时最底层的数据库事务对象,它可以用来将多次的数据库访问封装为“原子操作”,也可以通过修改隔离级别来控制并发时的行为。TransactionScope则是为了在分布式数据节点上完成事务的工具,它经常被用在业务逻辑层将多个数据库操作组织成业务事务的场景,可以做到透明的可分布式事务控制和隐式失败回滚。但与此同时也经常有人提到TransactionScope有性能和部署方面的问题,关于这一点,根据MSDN的 Using the TransactionScope Class 的说法,当一个TransactionScope包含的操作是同一个数据库连接时,它的行为与SqlTransaction是类似的。当它在多个数据库连接上进行数据操作时,则会将本地数据库事务提升为分布式事务,而这种提升要求各个节点均安装并启动DTC服务来支持分布式事务的协调工作,它的性能与本地数据库事务相比会低很多,这也是CAP定律说的分布式系统的Consistency和Availability不可兼得的典型例子。所以当我们选择是否使用TransactionScope时,一定要确认它会不会导致不想发生的分布式事务,也应该确保事务尽快做完它该做的事情,为了确认事务是否被提升我们可以用SQL Profiler去跟踪相关的事件。
然后再来看一看Entity Framework,其实EF也跟事务有关系。它的Context概念来源于Unit of Work模式,Context记录提交前的所有Entity变化,并在SaveChanges方法调用时发起真正的数据库操作,SaveChanges方法在默认情况下隐含一个事务,并且试图使用乐观并发控制来提交数据,但是为了进行并发控制我们需要将Entity Property的ConcurrencyMode设置为Fixed才行,否则EF不理会在此Entity上面发生的并发修改,这一点可以参考MSDN Saving Changes and Managing Concurrency。微软推荐大家使用以下方法来捕获冲突的并发操作,并使用RefreshMode来选择覆盖或丢弃失败的操作:
try
{
// Try to save changes, which may cause a conflict.
int num = context.SaveChanges();
Console.WriteLine("No conflicts. " +
num.ToString() + " updates saved.");
}
catch (OptimisticConcurrencyException)
{
// Resolve the concurrency conflict by refreshing the
// object context before re-saving changes.
context.Refresh(RefreshMode.ClientWins, orders); // Save changes.
context.SaveChanges();
Console.WriteLine("OptimisticConcurrencyException "
+ "handled and changes saved");
}
当然除了乐观并发控制我们还可以对冲突特别频繁、冲突解决代价很大的用例进行悲观并发控制。悲观并发基本思想是不让数据被同时离线修改,也就是像源码管理里面“加锁”功能一样,码农甲锁上了这个文件,乙就不能再修改了,这样一来这个文件就不可能发生冲突,悲观并发控制实现的方式比如数据行加IsLocked字段等。
最后为了进行多Context事务,当然还可以混合使用TransactionScope和EF。
好了理论简单介绍完,下面的例子是几种不同的方法对并发控制的效果,需求是每个Member都有个HasMessage字段,初始为False,我们需要给其中一个Member加入唯一一条MemberMessage,并将Member.HasMessage置为True。
建库脚本:
CREATE DATABASE [TransactionTest]
GO USE [TransactionTest]
GO CREATE TABLE [dbo].[Member](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Name] [nvarchar](32) NOT NULL,
[HasMessage] [bit] NOT NULL,
CONSTRAINT [PK_Member] PRIMARY KEY CLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET IDENTITY_INSERT [dbo].[Member] ON
INSERT [dbo].[Member] ([Id], [Name], [HasMessage]) VALUES (1, N'Tom', 0)
INSERT [dbo].[Member] ([Id], [Name], [HasMessage]) VALUES (2, N'Jerry', 0)
SET IDENTITY_INSERT [dbo].[Member] OFF CREATE TABLE [dbo].[MemberMessage](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Message] [nvarchar](128) NOT NULL,
[MemberId] [int] NOT NULL,
CONSTRAINT [PK_MemberMessage] PRIMARY KEY CLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO ALTER TABLE [dbo].[MemberMessage] WITH CHECK ADD CONSTRAINT [FK_MemberMessage_Member] FOREIGN KEY([MemberId])
REFERENCES [dbo].[Member] ([Id])
GO
ALTER TABLE [dbo].[MemberMessage] CHECK CONSTRAINT [FK_MemberMessage_Member]
GO
方法1:不使用TransactionScope,只依赖Entity各字段的默认并发控制。
Context和Entity定义
public class MyDbContext : DbContext
{
public MyDbContext() : base("TransactionTest") { } public MyDbContext(string connectionString) :
base(connectionString)
{ } public DbSet<Member> Members { get; set; }
} [Table("Member")]
public class Member
{
[Key]
public int Id { get; set; } public string Name { get; set; } public bool HasMessage { get; set; } public virtual ICollection<MemberMessage> Messages { get; set; }
} [Table("MemberMessage")]
public class MemberMessage
{
[Key]
public int Id { get; set; } public string Message { get; set; } public int MemberId { get; set; } [ForeignKey("MemberId")]
public virtual Member Member { get; set; }
}
测试代码
try
{
using (var context = new MyDbContext())
{
var tom = context.Members.FirstOrDefault(m => m.Id == );
if (tom != null && !tom.HasMessage)
{
Console.WriteLine("Press Enter to Insert MemberMessage...");
Console.ReadLine();
tom.Messages.Add(new MemberMessage()
{
Message = "Hi Tom!"
});
tom.HasMessage = true;
context.SaveChanges();
Console.WriteLine("Insert Completed!");
}
}
}
catch (Exception ex)
{
Console.WriteLine("Insert Failed: " + ex);
}
同时运行两个程序,结果是无法确保不重复插入
通过分析不难发现,该场景的并发控制关键就在于插入前检查HasMessage如果是False,则插入MemberMessage后更新Member.HasMessage字段时需要再次检查数据库中HasMessage字段是否为False,如果为True就是有其他人并发的更改了该字段,本次保存应该回滚或做其他处理。所以为此需要有针对性的加入并发控制。
方法2:给HasMessage字段加上并发检查
[Table("Member")]
public class Member
{
[Key]
public int Id { get; set; } public string Name { get; set; } [ConcurrencyCheck]
public bool HasMessage { get; set; } public virtual ICollection<MemberMessage> Messages { get; set; }
}
仍然使用方法1的测试代码,结果则是其中一次数据插入会抛出OptimisticConcurrencyException,也就是说防止重复插入数据的目的已经达到了。
那回过头来看看是否可以使用TransactionScope对EF进行并发控制,于是有方法3:使用TransactionScope但不给Entity加入并发检查
Context和Entity的定义与方法1完全一致,测试代码为
try
{
using (var scope = new System.Transactions.TransactionScope())
{
using (var context = new MyDbContext())
{
var tom = context.Members.FirstOrDefault(m => m.Id == );
if (tom != null && !tom.HasMessage)
{
Console.WriteLine("Press Enter to Insert MemberMessage...");
Console.ReadLine();
tom.Messages.Add(new MemberMessage()
{
Message = "Hi Tom!"
});
tom.HasMessage = true;
context.SaveChanges();
Console.WriteLine("Insert Completed!");
}
}
scope.Complete();
}
}
catch (Exception ex)
{
Console.WriteLine("Insert Failed: " + ex);
}
同样启动两个程序测试,发现其中一次保存操作抛出DbUpdateException,其内部原因是Transaction死锁导致该操作被作为牺牲者。所以看起来也可以达到并发控制的效果,这种方式的优点是不需要去仔细辨别业务中哪些操作会导致字段的并发更新冲突,所有的Entity都可以不加ConcurrencyCheck,缺点则是当冲突不多的时候这种死锁竞争协调与乐观并发控制相比性能会低些。
最后为了完备测试各种组合,我们试一试方法4:既使用TransactionScope,又在HasMessage字段上加入ConcurrencyCheck,Entity代码参考方法1,测试代码则参考方法3,结果仍然是TransactionScope检测到死锁并选择其中一个竞争者抛出异常。
结论:
- TransactionScope用或不用主要取决于是否需要进行分布式事务
- 即使不需要分布式事务,TransactionScope也可以用于没有精力仔细分析哪些Entity的字段需要进行并发检查的时候
- 如果能够细粒度分析并发场景,则推荐使用EF自带的并发控制机制