第二部分 尝试解决BulkWrite(List<WriteModel<T>>)问题
在上次发表的文章中,得到了一些很好的反馈,真切体会到写博文的好处,有高人指出两大问题,具体可以看看上篇中的评论,下面依然是发表一些个人见解,只做研究,并不保证解决实际问题。
这两大问题终究来说,是发生在BulkWrite(List<WriteModel<T>>)上,针对@ 从来不用 的问题,我试着找出影响的行数据还比对写入操作的数量,如果一致,则提交,如果不一致则回滚。
1.找出影响行数和实际操作数量 BulkWriteResult<T>有很多属性,我用的是ProcessedRequests.Count这个应该是反应的像mssql的影响的行数。而实际操作数量就是writer的个数。
2.备份元数据 每添加一个writer之前在内存中或备份数据库中保存一份元数据,我这里是保存在内存中的,声明了几个不同类型的集合
private List<TAggregate> beforeChange = new List<TAggregate>();//记录更新前的数据 private List<Guid> beforeAdd = new List<Guid>(); //记录添加前的数据ID private List<TAggregate> beforeDelete = new List<TAggregate>();//记录数据删除前的数据
然后在每添加一个writer之前,在对应的修改、添加、删除集合中添加元数据,这里看来必须要有数据库访问的了。没办法
if (IsUseTransaction) { try { beforeAdd.Add(entity.Id);//记录添加之前的数据的ID writers.Add(new InsertOneModel<TAggregate>(entity)); isRollback = false;//控制是否回滚 return; }
其它操作同理,后面我会把完整的代码贴出来的。先来分析一下。
3.处理提交事务逻辑 利用Collection.BulkWrite(writers)的返回值属性,找出实际影响的数据行数,这里我就按mssql的命名思路来了,同时如果若BulkWriteResult发生异常,我们也执行回滚
#region 事务控制 public void Commit() { if (!isRollback && writers.Count > 0)//如果不回滚,并且writers有数据 { BulkWriteResult<TAggregate> result; try { result = Collection.BulkWrite(writers); } catch (Exception) { Rollback();//若BulkWriteResult发生异常 throw; } if(result.ProcessedRequests.Count!=writers.Count)//检查完成写入的数量,如果有误,回滚 { Rollback(); } writers.Clear();//此时说明已成功提交,清空writers数据 return; } Rollback(); }
4.回滚操作 回滚嘛,我们就来个反操作,根据不同的类型操作集合,遍历执行反操作写入数据库,至于这部分如果出问题,我现在还没时间搞,以后如果有需要,再改
public void Rollback() { writers.Clear();//清空writers //执行反操作 beforeDelete.ForEach(o => { Collection.InsertOne(o); }); beforeChange.ForEach(o => { Collection.ReplaceOne(c => c.Id == o.Id, o); }); beforeAdd.ForEach(o => { Collection.DeleteOne(d => d.Id == o); }); }
5.修整后的Repostory
using EFAndMongoRepostory.Entity; using MongoDB.Driver; using System; using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; namespace EFAndMongoRepostory { public class MongoRepostory<TAggregate> where TAggregate :AggregateBase { #region 初始化及字段属性设置 /// <summary> /// 获取集合 /// </summary> protected IMongoCollection<TAggregate> Collection; /// <summary> /// 初始化,以类名作为集合名称 /// </summary> /// <param name="collection"></param> public MongoRepostory() { this.Collection = MongoDbContext.GetMongoCollection<TAggregate>(typeof(TAggregate).Name); } private List<WriteModel<TAggregate>> writers = new List<WriteModel<TAggregate>>();//写入模型 /// <summary> /// 指示是否起用事务,默认true /// </summary> public bool IsUseTransaction { get; set; } = true; private List<TAggregate> beforeChange = new List<TAggregate>();//记录更新前的数据 private List<Guid> beforeAdd = new List<Guid>(); //记录添加前的数据ID private List<TAggregate> beforeDelete = new List<TAggregate>();//记录数据删除前的数据 private bool isRollback = false;//回滚控制 #endregion #region 添加 /// <summary> /// 添加一条数据 /// </summary> /// <param name="entity"></param> public void Add(TAggregate entity) { if (entity == null) return; if (IsUseTransaction) { try { beforeAdd.Add(entity.Id);//记录添加之前的数据的ID writers.Add(new InsertOneModel<TAggregate>(entity)); isRollback = false;//控制是否回滚 return; } catch (Exception ex) { isRollback = true; throw new Exception(ex.Message); } } try { Collection.InsertOne(entity); } catch (Exception ex) { throw new Exception(ex.Message); } } /// <summary> /// 添加数据集合 /// </summary> /// <param name="entities"></param> public void Add(IEnumerable<TAggregate> entities) { ) return; if(IsUseTransaction) { try { entities.ToList().ForEach(o => { beforeAdd.Add(o.Id); writers.Add(new InsertOneModel<TAggregate>(o)); }); isRollback = false; return; } catch (Exception ex) { isRollback = true; throw new Exception(ex.Message); } } try { Collection.InsertMany(entities); } catch (Exception ex) { throw new Exception(ex.Message); } } #endregion #region 替换 /// <summary> /// 替换一条过滤的数据(请确保此方法Id属性是不能变) /// </summary> /// <param name="filter">过滤条件</param> /// <param name="enity">目标数据(目标数据的Id值必为源数据的Id)</param> public void ReplaceOne(Expression<Func<TAggregate, bool>> filter, TAggregate enity) { if (enity == null) return; if (IsUseTransaction) { try { //先记录修改之前的数据 beforeChange.Add(Collection.Find(Builders<TAggregate>.Filter.Where(filter)).FirstOrDefault()); writers.Add(new ReplaceOneModel<TAggregate>(Builders<TAggregate>.Filter.Where(filter), enity)); isRollback = false; return; } catch (Exception ex) { isRollback = true; throw new Exception(ex.Message); } } try { Collection.ReplaceOne(filter, enity); } catch (Exception ex) { throw new Exception(ex.Message); } } /// <summary> /// 替换一条数据(请确保此方法Id属性是不能变) /// </summary> /// <param name="id">目标id</param> /// <param name="enity">目标数据(目标数据的Id值必为源数据的Id)</param> public void ReplaceById(Guid id, TAggregate enity) { if (enity == null) return; if(enity.Id!=id) { isRollback = true; throw new Exception("the id can not change"); } if(IsUseTransaction) { try { beforeChange.Add(Collection.Find(Builders<TAggregate>.Filter.Eq(o => o.Id, id)).FirstOrDefault()); writers.Add(new ReplaceOneModel<TAggregate>(Builders<TAggregate>.Filter.Eq(o=>o.Id, id), enity)); isRollback = false; return; } catch (Exception ex) { isRollback = true; throw new Exception(ex.Message); } } try { Collection.ReplaceOne(o => o.Id == id, enity); } catch (Exception ex) { throw new Exception(ex.Message); } } /// <summary> /// 查找一条数据并且替换 /// </summary> /// <param name="id">目标数据的id</param> /// <param name="enity">更改后的数据</param> /// <returns>更改前的数据</returns> public TAggregate FindOneAndReplace(Guid id, TAggregate enity) { if (enity == null) return null; if (enity.Id != id) { throw new Exception("the id can not change"); } return Collection.FindOneAndReplace(o => o.Id == id, enity); } /// <summary> /// 查找一条数据并且替换 /// </summary> /// <param name="filter">条件</param> /// <param name="enity">更改后的数据</param> /// <returns>更改前的数据</returns> public TAggregate FindOneAndReplace(Expression<Func<TAggregate,bool>>filter, TAggregate enity) { if (enity == null) return null; return Collection.FindOneAndReplace(filter, enity); } #endregion #region 移除 /// <summary> /// 根据过滤删除数据 /// </summary> /// <param name="filter"></param> public void Remoe(Expression<Func<TAggregate, bool>> filter) { if (IsUseTransaction) { try { if(Collection.Find(filter).FirstOrDefault()==null)//如果要删除的数据不存在数据库中 { throw new Exception("要删除的数据不存在数据库中"); } beforeDelete.Add(Collection.Find(filter).FirstOrDefault()); writers.Add(new DeleteOneModel<TAggregate>(Builders<TAggregate>.Filter.Where(filter))); isRollback = false; return; } catch (Exception ex) { isRollback = true; throw new Exception(ex.Message); } } try { Collection.DeleteMany(filter); } catch (Exception ex) { throw new Exception(ex.Message); } } public void RemoveById(Guid id) { if (IsUseTransaction) { try { beforeDelete.Add(Collection.Find(Builders<TAggregate>.Filter.Eq(o => o.Id, id)).FirstOrDefault()); writers.Add(new DeleteOneModel<TAggregate>(Builders<TAggregate>.Filter.Eq(o => o.Id, id))); isRollback = false; return; } catch (Exception ex) { isRollback = true; throw new Exception(ex.Message); } } try { Collection.DeleteOne(o => o.Id == id); } catch (Exception ex) { throw new Exception(ex.Message); } } #endregion #region 更新 /// <summary> /// 过滤数据,执行更新操作(如不便使用,请用Replace相关的方法代替) /// /// 一般用replace来代替这个方法。其实这个功能还算强大的,可以很*修改多个属性 /// 关健是set参数比较不好配置,并且如果用此方法,调用端必须引用相关的DLL,set举例如下 /// set = Builders<TAggregate>.Update.Update.Set(o => o.Number, 1).Set(o => o.Description, "002.thml"); /// set作用:将指定TAggregate类型的实例对象的Number属性值更改为1,Description属性值改为"002.thml" /// 说明:Builders<TAggregate>.Update返回类型为UpdateDefinitionBuilder<TAggregate>,这个类有很多静态 /// 方法,Set()是其中一个,要求传入一个func的表达示,以指示当前要修改的,TAggregate类型中的属性类型, /// 另一个参数就是这个属性的值。 /// /// Builders<TAggregate>类有很多属性,返回很多如UpdateDefinitionBuilder<TAggregate>的很有用帮助类型 /// 可以能参CSharpDriver-2.2.3.chm文件 下载MongoDB-CSharpDriver时带有些文件 /// 或从官网https://docs.mongodb.com/ecosystem/drivers/csharp/看看 /// /// </summary> /// <param name="filter">过滤条件</param> /// <param name="set">修改设置</param> public void Update(Expression<Func<TAggregate, bool>> filter, UpdateDefinition<TAggregate> set) { if (set == null) return; if (IsUseTransaction)//如果启用事务 { try { beforeChange.Add(Collection.Find(filter).FirstOrDefault()); writers.Add(new UpdateManyModel<TAggregate>(Builders<TAggregate>.Filter.Where(filter), set)); isRollback = false;//不回滚 return;//不执行后继操作 } catch (Exception ex) { isRollback = true; throw new Exception(ex.Message); } } try { Collection.UpdateMany(filter, set); } catch (Exception ex) { throw new Exception(ex.Message); } } /// <summary> /// 过滤数据,执行更新操作(如不便使用,请用Replace相关的方法代替) /// /// 一般用replace来代替这个方法。其实这个功能还算强大的,可以很*修改多个属性 /// 关健是set参数比较不好配置,并且如果用此方法,调用端必须引用相关的DLL,set举例如下 /// set = Builders<TAggregate>.Update.Update.Set(o => o.Number, 1).Set(o => o.Description, "002.thml"); /// set作用:将指定TAggregate类型的实例对象的Number属性值更改为1,Description属性值改为"002.thml" /// 说明:Builders<TAggregate>.Update返回类型为UpdateDefinitionBuilder<TAggregate>,这个类有很多静态 /// 方法,Set()是其中一个,要求传入一个func的表达示,以指示当前要修改的,TAggregate类型中的属性类型, /// 另一个参数就是这个属性的值。 /// /// Builders<TAggregate>类有很多属性,返回很多如UpdateDefinitionBuilder<TAggregate>的很有用帮助类型 /// 可以能参CSharpDriver-2.2.3.chm文件 下载MongoDB-CSharpDriver时带有些文件 /// 或从官网https://docs.mongodb.com/ecosystem/drivers/csharp/看看 /// /// </summary> /// <param name="id">找出指定的id数据</param> /// <param name="set">修改设置</param> public void Update(Guid id, UpdateDefinition<TAggregate> set) { if (set == null) return; if (IsUseTransaction)//如果启用事务 { try { beforeChange.Add(Collection.Find(Builders<TAggregate>.Filter.Eq(o => o.Id, id)).FirstOrDefault()); writers.Add(new UpdateManyModel<TAggregate>(Builders<TAggregate>.Filter.Eq(o => o.Id, id), set)); isRollback = false;//不回滚 return;//不执行后继操作 } catch (Exception ex) { isRollback = true; throw new Exception(ex.Message); } } try { Collection.UpdateMany(o => o.Id == id, set); } catch (Exception ex) { throw new Exception(ex.Message); } } #endregion #region 事务控制 public void Commit() { )//如果不回滚,并且writers有数据 { BulkWriteResult<TAggregate> result; try { result = Collection.BulkWrite(writers); } catch (Exception) { Rollback();//若BulkWriteResult发生异常 throw; } if(result.ProcessedRequests.Count!=writers.Count)//检查完成写入的数量,如果有误,回滚 { Rollback(); } writers.Clear();//此时说明已成功提交,清空writers数据 return; } Rollback(); } public void Rollback() { writers.Clear();//清空writers //执行反操作 beforeDelete.ForEach(o => { Collection.InsertOne(o); }); beforeChange.ForEach(o => { Collection.ReplaceOne(c => c.Id == o.Id, o); }); beforeAdd.ForEach(o => { Collection.DeleteOne(d => d.Id == o); }); } #endregion #region 查询 /// <summary> /// 查找所有数据集合 /// </summary> /// <returns></returns> public IQueryable<TAggregate> FindAll() { return Collection.AsQueryable(); } /// <summary> /// 根据Id查找一条数据 /// </summary> /// <param name="id"></param> /// <returns></returns> public TAggregate FindById(Guid id) { var find = Collection.Find(o => o.Id == id); if (!find.Any()) return null; return find.FirstOrDefault(); } /// <summary> /// 根据过滤条件找出符合条件的集合 /// </summary> /// <param name="filter"></param> /// <returns></returns> public List<TAggregate> FindByFilter(Expression<Func<TAggregate, bool>> filter) { var find = Collection.Find(filter); if (!find.Any()) return null; return find.ToList(); } /// <summary> /// 根据过滤条件找出一条数据 /// </summary> /// <param name="filter"></param> /// <returns></returns> public TAggregate FindOne(Expression<Func<TAggregate, bool>> filter) { return Collection.Find(filter).FirstOrDefault(); } #endregion /// <summary> /// 根据聚合类ID添加导航数据到 导航集合(中间表) /// </summary> /// <typeparam name="TNav">导航类</typeparam> /// <param name="nav">提供参数时直接new一个具体的nav类就行了</param> /// <param name="filter"></param> /// <param name="foreignKey"></param> public void AddByAggregate<TNav>(TNav nav, Expression<Func<TAggregate, bool>> filter, Guid foreignKey) where TNav : NavgationBase { //导航类的集合 var navCollection = MongoDbContext.GetMongoCollection<TNav>(typeof(TNav).Name); //遍历当前集合中所有符合条件的数据 Collection.Find(filter).ToList().ForEach(o => { //将导航类的属性赋相应的值 nav.AggregateId = foreignKey; nav.ValueObjectId = o.Id; //插入到数据库 navCollection.InsertOne(nav); }); } } }
6.测试一下 马上就要上班了,我也不啰嗦了,有注释
using EFAndMongoRepostory; using EFAndMongoRepostory.Entity; using MongoDB.Driver; using System; using System.Collections.Generic; using System.Linq; namespace MongoTest { class Program { static void Main(string[] args) { #region 初始化 var db = MongoDbContext.SetMongoDatabase("mongodb://localhost:27017", "MongoTest"); #endregion #region 准备数据 List<Role> rList = new List<Role> { new Role { Name="r001", Description="rd001" }, new Role { Name="r002",Description="rd002" }, new Role { Name="r003",Description="rd003" } }; List<User> uList = new List<User> { new User { Name=", Pwd="pwd001" }, new User { Name=", Pwd="pwd002" } , new User { Name=", Pwd="pwd003" } , new User { Name=", Pwd="pwd004" } }; List<Permission> pList = new List<Permission> { ", Url="001.html" }, ", Url="002.html" }, ", Url="003.html" }, ", Url="004.html" }, ", Url="005.html" } }; #endregion MongoRepostory<User> repostory = new MongoRepostory<User>(); //清空集合 db.DropCollection(typeof(User).Name); //执行一次批量添加 repostory.Add(uList); //提交后查询所有数据 repostory.Commit(); repostory.FindAll().ToList().ForEach(o => { Console.WriteLine(o.Name + ":" + o.Pwd + ":" + o.Number); }); //执行一次插入操作 repostory.Add( }); //执行一次替换操作 "); user.Pwd = ; user.Name = "u001"; repostory.ReplaceOne(o => o.Name == ", user); ); ); ); //执行3次更新操作 repostory.Update(o => o.Name == ", update2); repostory.Update(o => o.Name == ", update3); repostory.Update(o => o.Name == ", update4); //执行一次删除操作 "); repostory.Remoe(o => o.Id==u.Id); //提交 repostory.Commit(); //查询所有数据 repostory.FindAll().ToList().ForEach(o => { Console.WriteLine(o.Name + ":" + o.Pwd + ":" + o.Number); }); //执行回滚 repostory.Rollback(); //查询所有数据 Console.WriteLine("--------------------回滚到原来,应该是空的-----------------------------"); repostory.FindAll().ToList().ForEach(o => { Console.WriteLine(o.Name + ":" + o.Pwd + ":" + o.Number); }); Console.ReadKey(); } } }
7. 关于@ wtsheng88的问题 本身存在那个问题的几率应该不是很大吧,如果实在要解决,我的思路是,把最后一次操作的元数据保存到一个备份数据库中,下次开机的时候先从备份中对比现在的数据库,如果存在不同,说明上次停电的时候没有完成,更新现在的数据库。而这些操作应该单独封装成一个静态方法,以便程序开始时就可以执行,在回滚操作中也可以调用。不行,没时间了
8.我要迟到了 还是那句话:都是个人自发研究、测试的,如有雷同,不胜荣幸;如觉不妥,留言喷射;如有错误,还请赐教;如获帮助,示意欣赏。新版中有很多异步操作,本人对此没作研究,怕会产生数据安全问题,所以全部用的是同步方法。