如何对正在使用中的数据库进行历史数据迁移?

作为开发人员,能学会简单地对历史数据迁移是日常基本功。在上篇文章中,我们初步地解释了历史数据迁移的基本概念,以及如何使用SQL SERVER存储过程实现对历史数据迁移。

一般来说直接在数据库中写SQL语句(insert into ... select from)的方式进行迁移,仅适用于“停服状态”下的数据迁移场景,也就是数据库处于无用户使用的情况下,而且迁移场景有限。

当数据库在生产环境中实时在用,而且数据量较大的前提下,很显然会影响性能,上述方法并不可取。

今天我们来介绍一个新的思路,可将历史数据迁移对现有实时在用数据库的性能影响降到较低水平。

1、大体迁移思路

简单描述下思路:
1、先利用程序根据创建时间升序排序,直接查询TopN条数据,记录到程序缓存中。
【查询后就和正式数据库没关系了,只要TopN不大,这个简单的查询几乎对数据库没有任何影响。】
2、然后程序将缓存中的TopN条数据写入到历史库。
【这个阶段和正式数据库没有半点关系,哪怕写的再慢,也不影响正式数据库】
3、核对下历史数据库中的数据,确保没有问题,就可以删除正式环境中的TopN条数据了。
【这里删除操作稍微比查询影响大些,但是仅仅是小批量的数据删除同样影响不大。】
4、如果需要大批量数据迁移怎么办? 非常简单,分批执行,比如循环执行CountX次,那么迁移的总数据量就为TopN×CountX ,所以根本不怕数据量大,开启程序自动执行即可,也就是用耗时间来减少对数据库性能的影响。【执行时间换性能】

这个思路和insert into ... select from的方式,最大的区别就在用写入历史数据库的过程不影响正式在用的数据库,只剩下TopN条数据的查询和删除操作,影响很小。

特别说明:TopN不能太大,这个越小越好,根据表字段的多少,数据库的性能,以及用户连接数情况综合考虑,建议TopN取值在1000到5000条之间,这样对数据库影响几乎可以忽略。

 

2、例子中使用到的主要技术

为了实现这个思路,我这里也随便写个简单程序来试试效果,做个例子,仅供大家参考。

由于是最基础的例子,我就不用通用的底层框架了,免得大家看起来吃力。同时为了运行演示方便,直接写个exe可执行程序好了。

这个讲解的例子,用到的技术主要包括:
数据库:SQL SERVER,
数据库访问组件:Entity Framework Core
日志记录:Serialog.AspNetCore
开发语言:C#
技术框架:.NET 5
项目模版:控制台应用程序

 

3、创建个Model实体类

按照EntityFramework Core的思路,先建个Model吧。

  1 using System;
  2 using System.Collections.Generic;
  3 using System.ComponentModel.DataAnnotations;
  4 using System.ComponentModel.DataAnnotations.Schema;
  5 using System.Text;
  6 
  7 namespace Tyingtech_glps.Entities.HDM
  8 {
  9     /// <summary>
 10     /// 接口请求记录
 11     /// </summary>
 12     [Table("GLPS_APIREQUEST")]
 13     public class GLPS_APIREQUEST
 14     {
 15         [Key]
 16         public string FID { get; set; }
 17 
 18         /// <summary>
 19         /// 接口编号【固定】
 20         /// </summary>
 21         public string FAPICODE { get; set; }
 22 
 23         /// <summary>
 24         /// 请求方身份ID
 25         /// </summary>
 26         public string FAPPID { get; set; }
 27 
 28         /// <summary>
 29         /// 接口请求方URL地址
 30         /// </summary>
 31         public string FFORMURL { get; set; }
 32 
 33         /// <summary>
 34         /// 接口请求方IP
 35         /// </summary>
 36         public string FIP { get; set; }
 37 
 38         /// <summary>
 39         /// 请求参数(JSON字符串)
 40         /// </summary>
 41         public string FREQUESTDATA { get; set; }
 42 
 43         /// <summary>
 44         /// 请求时间点
 45         /// </summary>
 46         public DateTime FREQTIME { get; set; }
 47 
 48         /// <summary>
 49         ///响应参数(JSON字符串)
 50         /// </summary>
 51         public string FRESPONSE { get; set; }
 52 
 53         /// <summary>
 54         /// 响应时间点
 55         /// </summary>
 56         public DateTime FRESTIME { get; set; }
 57 
 58         /// <summary>
 59         /// 总毫秒数
 60         /// </summary>
 61         public int FMILLISECOND { get; set; }
 62 
 63         /// <summary>
 64         /// 接口请求结果(1:成功;0:失败;-1:接口内部异常)
 65         /// </summary>
 66         public int FISSUCCESS { get; set; }
 67 
 68         /// <summary>
 69         /// 失败详情,仅内部使用(如:内部报错异常信息;AppId错误;非法请求;参数不全;...)
 70         /// </summary>
 71         public string FRESULT { get; set; }
 72 
 73         /// <summary>
 74         /// 约定格式数据1 (如:车牌号)
 75         /// </summary>
 76         public string FDATA1 { get; set; }
 77 
 78         /// <summary>
 79         /// 约定格式数据2 (如:进场、离场)
 80         /// </summary>
 81         public string FDATA2 { get; set; }
 82 
 83         /// <summary>
 84         /// 约定格式数据3 (如:...)
 85         /// </summary>
 86         public string FDATA3 { get; set; }
 87 
 88         /// <summary>
 89         /// 约定格式数据4(如:...)
 90         /// </summary>
 91         public string FDATA4 { get; set; }
 92 
 93         /// <summary>
 94         /// 约定格式数据5(如:...)
 95         /// </summary>
 96         public string FDATA5 { get; set; }
 97 
 98         /// <summary>
 99         /// 创建时间
100         /// </summary>
101         public DateTime FCREATETIME { get; set; }
102     }
103 }

 

4、执行TopN条数据的迁移 

  1 /// <summary>
  2 /// 历史数据迁移【一次性迁移】
  3 /// </summary>
  4 /// <param name="n">数据迁移量(条数) top N</param>
  5 /// <returns>实际迁移成功数量</returns>
  6 public static int DataToHisExecOne(int n)
  7 {
  8     n = n > 10000 ? 10000 : n;   //一次最多1万(再多可能会对性能有影响【迁移第一条:尽量不能影响现有在用数据库的业务】)
  9                         
 10     //测试环境
 11     EnumWhichDB whichDB = EnumWhichDB.DevGlps;
 12     EnumWhichDB whichDBHis = EnumWhichDB.DevGlpsHis;
 13 
 14     string connStr = DBConnectionString.GetConnStr(whichDB);         //当前需要迁移的数据库
 15     string connStrHis = DBConnectionString.GetConnStr(whichDBHis);   //历史数据库
 16 
 17     //Serialog 记录日志
 18     var logFileName = string.Format("ToHisOne_ApiRequest_{0}.txt", DateTime.Now.ToString("yyyyMMddHH"));
 19     var log = new LoggerConfiguration()
 20         .WriteTo.Console()
 21         .WriteTo.File(logFileName)
 22         .CreateLogger();
 23 
 24     log.Information($"开始迁移数据,计划迁移条数为:{n}");
 25 
 26     int beginCount = GetRowCount(whichDB);        //当前数据库数据迁移前的记录行数
 27     int beginCountHis = GetRowCount(whichDBHis);  //历史数据库数据迁移前的记录行数
 28 
 29     Stopwatch sw = new Stopwatch();     //检测运行时间(对每个阶段)
 30     Stopwatch swAll = new Stopwatch();  //总耗时
 31     swAll.Start();
 32 
 33     sw.Start();
 34     DateTime maxCreateTime = GetAscMaxCreateTimeTopN(whichDB,n);
 35     sw.Stop();
 36     log.Information("对应迁移数据FCREATETIME为:" + maxCreateTime.ToString("yyyy-MM-dd HH:mm:ss.fffff") + $",查询maxCreateTime耗时:{sw.ElapsedMilliseconds} 毫秒。");
 37 
 38     sw.Restart();   //重新开始计时
 39     List<GLPS_APIREQUEST> records = new List<GLPS_APIREQUEST>();
 40     //从数据库查最早的数据 
 41     using (var db = new GlpsDbContext(connStr))
 42     {
 43         records = db.DS_ApiRequest
 44             .Where(t => t.FCREATETIME <= maxCreateTime)
 45             .OrderBy(t => t.FCREATETIME)
 46             .AsNoTracking()    //非跟踪查询(只读,提升效率)
 47             .ToListAsync().Result;
 48     }
 49     sw.Stop(); //计时结束
 50     log.Information($"按时间点实际查询到 {records.Count} 条数据,耗时:{sw.ElapsedMilliseconds} 毫秒。");
 51 
 52     sw.Restart();   //重新开始计时            
 53     //写数据到历史数据库
 54     int newCount = 0;
 55     using (var db = new GlpsDbContext(connStrHis))
 56     {
 57         db.DS_ApiRequest.AddRange(records);
 58         newCount = db.SaveChanges();  //最后保存数据
 59         log.Information($"实际成功写入到历史数据库条数: {newCount}");
 60     }
 61     sw.Stop(); //计时结束
 62     log.Information($"实际成功写入到历史数据库条数:{newCount} , 写入耗时:{sw.ElapsedMilliseconds} 毫秒。");
 63 
 64 
 65     //最后删除数据
 66     DateTime maxCreateTimeHis = GetMaxCreateTime(whichDBHis); //历史数据库最大的FCreateTime
 67 
 68     //两个时间相同,则可以删除数据,否则不删除,直接预警(中间数据可能出错,需要人工干预)
 69     if (maxCreateTime == maxCreateTimeHis)
 70     {
 71         sw.Restart();
 72         var rowCount = DeleteByFCreateTime(whichDB, maxCreateTime);
 73         sw.Stop();
 74         log.Information($"迁移后删除数据条数:{rowCount} , 删除耗时:{sw.ElapsedMilliseconds} 毫秒。");
 75     }
 76     else if (newCount < n && newCount == records.Count)  //就是实际小于n,那么是:C#的datetime和数据库的datetime精度不同
 77     {
 78         sw.Restart();
 79         var rowCount = DeleteByFCreateTime(whichDB,maxCreateTimeHis); //需按历史数据库的日期来删除
 80         sw.Stop();
 81         log.Information($"迁移后删除数据条数:{rowCount} , 删除耗时:{sw.ElapsedMilliseconds} 毫秒。");
 82     }
 83     else
 84     {
 85         log.Error("数据对比出错:maxCreateTime != maxCreateTimeHis。 未执行最后的删除数据!!!请开发人员核对数据。");
 86         log.Information("maxCreateTime = " + maxCreateTime.ToString("yyyy-MM-dd HH:mm:ss.fff"));
 87         log.Information("maxCreateTimeHis = " + maxCreateTimeHis.ToString("yyyy-MM-dd HH:mm:ss.fff"));
 88     }
 89 
 90     int endCount = GetRowCount(whichDB);        //当前数据库数据迁移后的记录行数
 91     int endCountHis = GetRowCount(whichDBHis);  //历史数据库数据迁移后的记录行数
 92     log.Information($"迁移前GLPS_APIREQUEST的数据条数:{beginCount} , 迁移后数据条数:{endCount}");
 93     log.Information($"迁移前GLPS_APIREQUESTHis的数据条数:{beginCountHis} , 迁移后数据条数:{endCountHis}");
 94 
 95     swAll.Stop();
 96     log.Information($"swAll:迁移总耗时:{swAll.ElapsedMilliseconds} 毫秒。");
 97     log.Information("------------------------------------------------");
 98 
 99     return newCount;
100 }

 

中间用到的几个单独逻辑的方法 

 1 /// <summary>
 2 /// 获取总记录行数
 3 /// </summary>
 4 /// <param name="whichDB"></param>
 5 /// <returns></returns>
 6 private static int GetRowCount(EnumWhichDB whichDB)
 7 {
 8     string connStr = DBConnectionString.GetConnStr(whichDB);
 9     int totalCount = 0;
10     using (var db = new GlpsDbContext(connStr))
11     {
12         totalCount = db.DS_ApiRequest.Count();
13     }
14     return totalCount;
15 }
16 
17 
18 /// <summary>
19 /// 查询数据库中最大的FCreateTime
20 /// </summary>
21 /// <param name="whichDB"></param>
22 /// <returns></returns>
23 private static DateTime GetMaxCreateTime(EnumWhichDB whichDB)
24 {
25     string connStr = DBConnectionString.GetConnStr(whichDB);
26     DateTime maxCreateTime = DateTime.MinValue;
27     using (var db = new GlpsDbContext(connStr))
28     {
29         //查询历史数据库,最大的FCREATETIME
30         var record = db.DS_ApiRequest.OrderByDescending(t => t.FCREATETIME).Take(1).SingleOrDefault();
31         if (record != null)
32         {
33             maxCreateTime = record.FCREATETIME;
34         }                
35     }
36     return maxCreateTime;
37 }
38 
39 /// <summary>
40 /// 按创建时间从小到大排序(FCreateTime Asc),取前N条数据的最大FCreateTime
41 /// 【即:取最早N条数据中,最大的创建时间】
42 /// </summary>
43 /// <param name="whichDB"></param>
44 /// <param name="topN"></param>
45 /// <returns></returns>
46 private static DateTime GetAscMaxCreateTimeTopN(EnumWhichDB whichDB,int topN)
47 {
48     string connStr = DBConnectionString.GetConnStr(whichDB);
49     DateTime maxCreateTime = DateTime.MinValue;
50     using (var db = new GlpsDbContext(connStr))
51     {
52         //查询totalCount对应最大的FCREATETIME
53         var record = db.DS_ApiRequest.OrderBy(t => t.FCREATETIME)
54             .Skip(topN - 1).Take(1).SingleOrDefault();
55         if (record != null)
56         {
57             maxCreateTime = record.FCREATETIME;
58         }
59     }
60     return maxCreateTime;
61 }
62 
63 /// <summary>
64 /// 删除小于等于【某个创建时间】的数据
65 /// </summary>
66 /// <param name="whichDB"></param>
67 /// <param name="maxCreateTime"></param>
68 /// <returns>删除记录数</returns>
69 private static int DeleteByFCreateTime(EnumWhichDB whichDB, DateTime maxCreateTime)
70 {
71     int rowCount = 0;
72     string connStr = DBConnectionString.GetConnStr(whichDB);
73     using (var db = new GlpsDbContext(connStr))
74     {
75         List<SqlParameter> listParams = new List<SqlParameter>{
76                 new SqlParameter("FCREATETIME", maxCreateTime)
77             };
78         rowCount = db.Database.ExecuteSqlRaw(@"delete from GLPS_APIREQUEST where FCREATETIME<=@FCREATETIME", listParams);                
79     }
80     return rowCount;
81 }

 

5、大数据量循环执行的逻辑 

 1 /// <summary>
 2 /// 历史数据迁移【分批迁移】
 3 /// </summary>
 4 /// <param name="totalCount">任务总迁移条数</param>
 5 /// <param name="prePageCount">分批迁移,单次查询数量</param>
 6 public static void DataToHis(int totalCount=100,int prePageCount=10)
 7 {
 8     //临界值设定
 9     totalCount = totalCount > 1000000 ? 1000000 : totalCount;       //单次任务100万
10     prePageCount = prePageCount > 10000 ? 10000 : prePageCount;     //每次1万
11 
12     //Serialog 记录日志
13     var logFileName = string.Format("ToHis_ApiRequest_{0}.txt", DateTime.Now.ToString("yyyyMMddHH"));
14     var log = new LoggerConfiguration()
15         .WriteTo.Console()
16         .WriteTo.File(logFileName)
17         .CreateLogger();
18 
19     Stopwatch swAll = new Stopwatch();  //总耗时
20     swAll.Start();
21 
22     log.Information($"本次计划总迁移数据条数:{totalCount},分批单次执行条数:{prePageCount}");
23     int okCount = 0;  //迁移成功的条数
24     int runTimes = 0;
25     while (okCount < totalCount)
26     {
27         runTimes++;
28         if (runTimes % 20 == 0)   
29         {
30             Console.Clear();  //每执行20次的时候,清除控制台
31             Console.WriteLine("控制台已被清理。");
32         }
33         if (totalCount - okCount < prePageCount)
34         {
35             prePageCount = totalCount - okCount;  //最后一次如果没有一页数据,只迁移部分
36             if (prePageCount == 1) break;         //如果是1条的话,日期精度容易出现问题,特意控制不执行
37         }
38         okCount += DataToHisExecOne(prePageCount);
39         log.Information(@"已执行累计条数{0},累计耗时:{1}分{2}秒{3},累计执行次数{4}", okCount, swAll.Elapsed.TotalMinutes, swAll.Elapsed.Seconds, swAll.Elapsed.Milliseconds, runTimes);
40     }
41 
42     swAll.Stop();
43     log.Information($"本次实际总迁移数据条数:{okCount},共分批执行次数:{runTimes}");
44     log.Information("================================================");
45 
46 }

 

6、在appsettings.json 做一些基础配置

配置的目的是为了方便执行,免得改程序。

1  "TyUseEnv": "0", //使用环境(0:测试环境;1:正式环境(沙箱))
2   "TyTopN": "10", //每次迁移的数据条数
3   "TyTotalCount": "25", //分批总迁移数据量
4   "TyForTable": "GLPS_APIREQUEST", //需要迁移数据的表名(测试已支持:GLPS_APIREQUEST、GLPS_GATEENTRYREC)

 

其他的表以此类推,可以进行多表切换。

在控制台Main方法中通过依赖注入的方式,动态实例化要迁移的表结构。

历史数据迁移,只要支持此接口(IExecDataToHis)即可,说白就2个迁移方法而已:单次执行和分批循环执行。

 

其他的表以此类推,可以进行多表切换。

在控制台Main方法中通过依赖注入的方式,动态实例化要迁移的表结构。

历史数据迁移,只要支持此接口(IExecDataToHis)即可,说白就2个迁移方法而已:单次执行和分批循环执行。

 1 using Twi.NET5.Core;
 2 
 3 namespace Tyingtech_glps.Interface.HDM
 4 {
 5     /// <summary>
 6     /// 可执行的数据迁移接口(单个和分批)
 7     /// </summary>
 8     public interface IExecDataToHis : IWhoAmI
 9     {
10         /// <summary>
11         /// 历史数据迁移【分批迁移】
12         /// </summary>
13         /// <param name="totalCount">任务总迁移条数</param>
14         /// <param name="perPageCount">分批迁移,单次查询每页数量</param>
15         public TwiReturnBase DataToHis(int totalCount = 10000, int perPageCount = 1000);
16 
17         /// <summary>
18         /// 历史数据迁移【一次性迁移】
19         /// </summary>
20         /// <param name="n">数据迁移量(条数) top N</param>
21         public TwiReturnBase DataToHisExecOne(int n);
22     }
23 }

TwiReturnBase 就是一个统一封装的返回类型,不用管它。

 

7、控制台运行界面参考

最后exe程序的界面效果如下。

如何对正在使用中的数据库进行历史数据迁移?

 然后就是执行命令了,单次迁移输入1,分批执行迁移输入2。

迁移哪张表,每次单次迁移多少条,总共迁移多少数据量,都可在appsettings.json中配置。

 

8、单次执行效果

直接输入命令1,开始执行。

如何对正在使用中的数据库进行历史数据迁移?

 

(哈哈,看来我直接本地电脑还是非常卡的,不过不影响思路的效果,等最后我们换台测试服务器看看1000万数据的效果。)

 

9、分配执行效果

自动执行多次的效果,只要配置好,就会自动执行,分配执行不影响性能。

如果中间报错,会自动停止执行。

如何对正在使用中的数据库进行历史数据迁移?

 

10、大数据量的测试效果

我们换台测试服务器,用此方法测试下有数据在跑的沙箱环境一千万数据量的效果,每天源源不断执行,测试迁移了快939万的数据问题都不大。

如何对正在使用中的数据库进行历史数据迁移?

 

由于沙箱环境中也有数据在跑,所以业务数据库是实时增加数据量的。

 

11、此方法的优缺点

这个思路能解决哪怕是生产环境都可以用,有事没事迁移点数据量,对数据库性能没有多大影响,而且支持不同数据库的迁移,比如SQL SERVER 到MySQL等等,但是缺点就是迁移起来比较慢些。适用于小企业小项目的常见场景,这个思路基本就够了。

【以后有空我还会再开篇文章讲别的大数据迁移思路】

当然最后要说的是,这个例子不是很靠谱,为什么这么说呢?首先这个是硬编码方式,局限性太大,如果要通用的话,程序应该再抽下,直接配置表名相关的即可。

实现通用工具模式,任何时候无需修改代码,直接简单改配置即可。这个我们下期教程继续改进思路。

 

如何对正在使用中的数据库进行历史数据迁移?

上一篇:静态导入


下一篇:Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured