EF6的多线程与分库架构设计实现

1.项目背景

这里简单介绍一下项目需求背景,之前公司的项目基于EF++Repository+UnitOfWork的框架设计的,其中涉及到的技术有RabbitMq消息队列,Autofac依赖注入等常用的.net插件。由于公司的发展,业务不断更新,变得复杂起来,对于数据的实时性、存储容量要求也提高了一个新的高度。数据库上下文DbContext设计的是单例模式,基本上告别了多线程高并发的数据读写能力,看了园子里很多大神的博客,均为找到适合自己当前需求的DbContext的管理方式。总结目前主要的管理方式:

1)DbContext单例模式(长连接)。即公司之前的设计。很明显,这种设计方式无法支持多线程同步数据操作。报各种错误,最常见的,比如:集合已修改,无法进行枚举操作。---弃用

2)Using模式(短连接)。这种模式适合一些对于外键,导航属性不经常使用的场合,由于导航属性是放在上下文缓存中的,一旦上下文释放掉,导航属性就为null。当然,也尝试了其他大神的做法,比如,在上下文释放之前转换为        ToList或者使用饥饿加载的方式(ps:这种方式很不灵活,你总不可能遇到一个类类型就去利用反射加载找到它具有的导航属性吧或者直接InCluding),这些方法依旧没有办法解决目前的困境。也尝试这直接赋值给一个定义的同类      型的变量,但是对于这种带导航的导航的复杂类的深拷贝,没有找到合适的路子,有知道的可以告诉我,非常感谢!

以上两种方式及网上寻找的其他方式都没有解决我的问题。这里先上一下之前的Repository:

 using System.Data.Entity;
using System.Data.Entity.Validation; namespace MM.Data.Library.Entityframework
{
public class EntityFrameworkRepositoryContext : RepositoryContext, IEntityFrameworkRepositoryContext
{
protected DbContext container; public EntityFrameworkRepositoryContext(DbContext container)
{
this.container = container;
} public override void RegisterNew<TAggregateRoot>(TAggregateRoot obj)
{
this.container.Set<TAggregateRoot>().Add(obj);
this.IsCommit = false;
} public override void RegisterModified<TAggregateRoot>(TAggregateRoot obj)
{
if (this.container.Entry<TAggregateRoot>(obj).State == EntityState.Detached)
{
this.container.Set<TAggregateRoot>().Attach(obj);
}
this.container.Entry<TAggregateRoot>(obj).State = EntityState.Modified;
this.IsCommit = false;
} public override void RegisterDeleted<TAggregateRoot>(TAggregateRoot obj)
{
this.container.Set<TAggregateRoot>().Remove(obj);
this.IsCommit = false;
} public override void Rollback()
{
this.IsCommit = false;
} protected override void DoCommit()
{
if (!IsCommit)
{
//var count = container.SaveChanges();
//IsCommit = true;
try
{
var count = container.SaveChanges();
IsCommit = true;
}
catch (DbEntityValidationException dbEx)
{
foreach (var validationErrors in dbEx.EntityValidationErrors)
{
foreach (var validationError in validationErrors.ValidationErrors)
{
}
}
IsCommit = false;
}
}
} public System.Data.Entity.DbContext DbContext
{
get { return container; }
} public override void Dispose()
{
if (container != null)
container.Dispose();
}
}
}

2.设计思路及方法

 从上下文的单例模式来看,所要解决的问题无非就是在多线程对数据库写操作上面。只要在这上面做手脚,问题应该就能引刃而解。我的想法是将所有的要修改的数据分别放入UpdateList,InsertList,DeleteList三个集合中去,然后提交到数据库保存。至于DbContext的管理,通过一个数据库工厂获取,保证每一个数据库的连接都是唯一的,不重复的(防止发生类似这种错误:正在创建模型,此时不可使用上下文。),用的时候直接去Factory拿。等到数据库提交成功后,清空集合数据。看起来,实现起来很容易,但是因为还涉及到其他技术,比如Redis。所以实现过程费劲。也许我的能力还差很多。总之,废话不多说,直接上部分实现代码:

数据库上下文建立工厂:

     /// <summary>
/// 数据库建立工厂
/// Modify By:
/// Modify Date:
/// Modify Reason:
/// </summary>
public sealed class DbFactory
{
public static IDbContext GetCurrentDbContext(string connectstring,string threadName)
{
lock (threadName)
{
//CallContext:是线程内部唯一的独用的数据槽(一块内存空间)
//传递Context进去获取实例的信息,在这里进行强制转换。
var Context = CallContext.GetData("Context") as IDbContext; if (Context == null) //线程在内存中没有此上下文
{
var Scope = UnitoonIotContainer.Container.BeginLifetimeScope();
//如果不存在上下文 创建一个(自定义)EF上下文 并且放在数据内存中去
Context = Scope.Resolve<IDbContext>(new NamedParameter("connectionString", connectstring));
CallContext.SetData("Context", Context);
}
else
{ if (!Context.ConnectionString.Equals(connectstring))
{
var Scope = UnitoonIotContainer.Container.BeginLifetimeScope();
//如果不存在上下文 创建一个(自定义)EF上下文 并且放在数据内存中去
Context = Scope.Resolve<IDbContext>(new NamedParameter("connectionString", connectstring));
CallContext.SetData("Context", Context);
}
}
return Context;
}
} }

Repository:

     public class RepositoryBase<T, TContext> : IRepositoryBase<T> where T : BaseEntity

         where TContext : ContextBase, IDbContext, IDisposable, new()
{
public List<T> InsertList { get; set; }
public List<T> DeleteList { get; set; }
public List<T> UpdateList { get; set; } #region field protected readonly string Connectstring;
///// <summary>
///// </summary>
//protected static IDbContext Context;
protected IDbContext dbContext;
private static readonly ILifetimeScope Scope;
public static int xcount = ;
/////// <summary>
/////// </summary>
//protected readonly DbSet<T> Dbset; #endregion #region ctor static RepositoryBase()
{
Scope = UnitoonIotContainer.Container.BeginLifetimeScope();
} /// <summary>
/// 使用默认连接字符串name=connName
/// </summary>
public RepositoryBase() : this("")
{
} /// <summary>
/// 构造函数
/// </summary>
/// <param name="connectionString">连接字符串</param>
public RepositoryBase(string connectionString)
{
InsertList = new List<T>();
DeleteList = new List<T>();
UpdateList = new List<T>(); //*****做以下调整,初始化,建立所有数据库连接,保持长连接状态,在用的时候去判断使用连接 //todo 待处理 if (string.IsNullOrWhiteSpace(connectionString))
{
var name = DataBase.GetConnectionString(Activator.CreateInstance<T>().DbType);
//Context= ContextHelper.GetDbContext(Activator.CreateInstance<T>().DbType);
connectionString = name;
}
Connectstring = connectionString; // Context = Scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring)); //Context = new TContext { ConnectionString = connectionString }; // Dbset = Context.Set<T>(); //var loggerFactory = ((DbContext)Context).GetService<ILoggerFactory>();
//loggerFactory.AddProvider(new DbLoggerProvider(Console.WriteLine));
//loggerFactory.AddConsole(minLevel: LogLevel.Warning);
} //public RepositoryBase(TContext context)
//{
// Context = context;
// Dbset = context.Set<T>();
//} #endregion #region Method //public virtual IDbContext GetDbContext(ILifetimeScope scope)
//{ //} #region Check Model /// <summary>
/// 校正Model
/// </summary>
protected virtual void ValidModel()
{
} #endregion #region Update public virtual void Update(T entity)
{
Check.NotNull(entity, "entity");
UpdateList.Add(entity);
//context.Set<T>().Update(entity);
} public virtual void Update(IEnumerable<T> entities)
{
Check.NotNull(entities, "entities");
UpdateList.AddRange(entities);
} #endregion #region PageList public virtual IEnumerable<T> GetPageList(Expression<Func<T, bool>> where, Expression<Func<T, object>> orderBy,
int pageIndex, int pageSize)
{
//todo
} #endregion #region Insert public virtual void Add(T entity)
{
Check.NotNull(entity, "entity");
//排除已经存在的项(对于多线程没有任何用处)
if (!InsertList.Exists(e => e.Equals(entity)))
{
InsertList.Add(entity);
} } public virtual void Add(IEnumerable<T> entities)
{
Check.NotNull(entities, "entities");
InsertList.AddRange(entities);
} public void BulkInsert(IEnumerable<T> entities)
{
Check.NotNull(entities, "entities");
InsertList.AddRange(entities);
} #endregion #region Delete public virtual void Delete(int id)
{
var entity = GetById(id);
Delete(entity);
// throw new NotImplementedException("Delete(int id)");
} public virtual void Delete(string id)
{
throw new NotImplementedException("Delete(string id)");
} public virtual void Delete(T entity)
{
Check.NotNull(entity, "entity");
DeleteList.Add(entity);
} public virtual void Delete(IEnumerable<T> entities)
{
Check.NotNull(entities, "entities");
foreach (var x1 in DeleteList)
{
DeleteList.Add(x1);
}
} public virtual void Delete(Expression<Func<T, bool>> where)
{
var list = DeleteList.Where(where.Compile());
Delete(list);
} #endregion #region Commit public int Commit()
{
ValidModel();
//var x = Activator.CreateInstance<T>();
//Context = ContextHelper.GetDbContext(x.DbType);
//using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
//{
// var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring)); //var loggerFactory = Activator.CreateInstance<ILoggerFactory>();// ((DbContext)context).GetService<ILoggerFactory>();
//loggerFactory.AddProvider(new DbLoggerProvider(Console.WriteLine));
dbContext = DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
var dbset = dbContext.Set<T>();
if (InsertList != null && InsertList.Any())
{
List<T> InsertNewList = InsertList.Distinct(new PropertyComparer<T>("Id")).ToList();//按照特定规则排除重复项
dbset.AddRange(InsertNewList);
} if (DeleteList != null && DeleteList.Any())
DeleteList.ForEach(t =>
{
// Context.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
//dbContext.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
dbset.Attach(t);
dbset.Remove(t);
});
if (UpdateList != null && UpdateList.Any())
{
List<T> UpdateNewList = UpdateList.Distinct(new PropertyComparer<T>("Id")).ToList();//按照特定规则排除重复项
UpdateNewList.ForEach(t =>
{
//Context.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
// dbContext.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
dbContext.Entry(t).State = EntityState.Modified;
});//.UpdateRange(UpdateNewList);
}
var result = ;
try
{
result = dbContext.SaveChanges();
}
catch (Exception ex)
{ // throw;
} if (InsertList != null && InsertList.Any())
InsertList.Clear();
if (DeleteList != null && DeleteList.Any())
DeleteList.Clear();
if (UpdateList != null && UpdateList.Any())
UpdateList.Clear();
return result;
//}
} public async Task<int> CommitAsync()
{
ValidModel();
//todo } #endregion #region Query
public IQueryable<T> Get()
{
return GetAll().AsQueryable();
}
//public virtual T Get(Expression<Func<T, bool>> @where)
//{
// using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
// { // var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
// var dbset = context.Set<T>();
// return dbset.FirstOrDefault(where);
// }
//} public virtual async Task<T> GetAsync(Expression<Func<T, bool>> @where)
{
//todo
} public virtual T GetById(int id)
{
throw new NotImplementedException("GetById(int id)");
} public virtual async Task<T> GetByIdAsync(int id)
{
throw new NotImplementedException("GetById(int id)");
} public virtual T GetById(string id)
{
throw new NotImplementedException("GetById(int id)");
} public virtual async Task<T> GetByIdAsync(string id)
{
throw new NotImplementedException("GetById(int id)");
} public virtual T Get(Expression<Func<T, bool>> @where)//, params string[] includeProperties
{
//var scope = UnitoonIotContainer.Container.BeginLifetimeScope();
//{
// var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
//Thread.Sleep(50);
//lock (Context)
{
dbContext= DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
var dbset = dbContext.Set<T>().Where(e => !e.IsDeleted).AsQueryable();
var entity = dbset.FirstOrDefault(where);
//test
// Context.Entry(entity).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
return entity;
} //}
} public virtual IEnumerable<T> GetAll()
{
//Thread.Sleep(50);
//lock (Context)
{
dbContext = DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
var dbset = dbContext.Set<T>().Where(e => !e.IsDeleted);
//test
//dbset.ToList().ForEach(t =>
//{
// Context.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
//}); return dbset;
} //var scope = UnitoonIotContainer.Container.BeginLifetimeScope(); // var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring)); } public async virtual Task<IEnumerable<T>> GetAllAsync()
{
//todo
} public virtual IEnumerable<T> GetMany(Expression<Func<T, bool>> where)
{
//using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
//{
// var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring)); // var dbset = context.Set<T>();
//Thread.Sleep(50);
//lock (Context)
{
dbContext = DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
var dbset = dbContext.Set<T>().Where(e => !e.IsDeleted);
//test
//dbset.ToList().ForEach(t =>
//{
// Context.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
//});
return dbset.Where(@where).ToList();
} //}
} public virtual async Task<IEnumerable<T>> GetManyAsync(Expression<Func<T, bool>> where)
{
//todo
} public virtual IEnumerable<T> IncludeSubSets(params Expression<Func<T, object>>[] includeProperties)
{
//todo
} #region navigation
/// <summary>
/// 加载导航
/// </summary>
/// <param name="where"></param>
/// <param name="includeProperties"></param>
/// <returns></returns>
//public virtual T Get(Expression<Func<T, bool>> @where, params Expression<Func<T, object>>[] includeProperties)
//{
// using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
// { // var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
// var dbset = context.Set<T>();
// var query = includeProperties.Aggregate<Expression<Func<T, object>>, IQueryable<T>>(dbset,
// (current, includeProperty) => current.Include(includeProperty));
// return query.FirstOrDefault(where);
// }
//} //public virtual T Get(Expression<Func<T, bool>> @where)//, params string[] includeProperties
//{
// //反射获取导航
// var includeProperties =
// Activator.CreateInstance<T>().GetType().GetProperties().Where(p => p.GetMethod.IsVirtual).Select(e => e.Name).ToArray(); //todo
//}
#endregion
public List<TDynamicEntity> GetDynamic<TTable, TDynamicEntity>(Expression<Func<TTable, object>> selector,
Func<object, TDynamicEntity> maker) where TTable : class
{
//todo
} public List<TDynamicEntity> GetDynamic<TTable, TDynamicEntity>(Func<TTable, object> selector,
Func<object, TDynamicEntity> maker) where TTable : class
{
//todo } #endregion #region Count public virtual async Task<int> CountAsync()
{
//todo
} public virtual async Task<int> CountByAsync(Expression<Func<T, bool>> where)
{
//todo
} #endregion #region Exists public virtual bool Exists(string id)
{
throw new NotImplementedException();
} public virtual bool Exists(int id)
{
throw new NotImplementedException();
} public virtual async Task<bool> ExistsAsync(string id)
{
throw new NotImplementedException();
} public virtual async Task<bool> ExistsAsync(int id)
{
throw new NotImplementedException();
} public virtual bool Exists(Expression<Func<T, bool>> @where)
{
throw new NotImplementedException();
} public virtual async Task<bool> ExistsAsync(Expression<Func<T, bool>> @where)
{
throw new NotImplementedException();
} #endregion #endregion
}

以上就是EF6的多线程与分库架构设计实现的部分相关内容。

如果有需要全部源代码或者交流的,直接联系我QQ:694666781  或者: https://item.taobao.com/item.htm?spm=a2oq0.12575281.0.0.3d2a1debWgF2RT&ft=t&id=608713437692

另外,该场景下,Redis相关使用方法及可能遇到的问题及解决方法我会另写一篇进行展开。如有不妥不正之处,请大神指正,谢谢!

上一篇:while循环 格式化输出 密码本 编码的初识


下一篇:Java 数据驱动测试