1 背景:
随着公司业务的成长,数据量也随之的不断增长。随之而来的问题是在做ETL的时候,时间花费也越来越长。为了节省时间开销,我们只想要更新最新的数据,不想要把公司历年所有的数据都进行处理。这种情况就被称为变更数据捕获(Change Data Capture,又名CDC)。在SQLServer2008之前,对数据变更的捕获通常使用触发器把DML操作中的INSERT/UPDATE/DELETE数据记录下来,但是触发器的维护比较困难;时间戳严重依赖于数据库的设计,而且必须在业务代码中对时间字段进行维护。
1.1 适用环境:
仅在SQLServer2008(含)以后的企业版、开发版和评估版中可用,标准版中不可用
1.2 详解:
变更数据捕获的更改数据源为 SQL Server 事务日志。 在将插入、更新和删除应用于跟踪的源表时,将会在日志中添加说明这些更改的项。 日志用作捕获进程的输入来源。 它会读取日志,并在跟踪的表的关联更改表中添加有关更改的信息。 系统将提供一些函数,以枚举在更改表中指定范围内发生的更改,并以筛选的结果集的形式返回该值。 通常,应用程序进程使用筛选的结果集在某种外部环境中更新源表示形式。
2 基本操作
相关的存储过程:
Sys.sp_cdc_add_job --添加job
Sys.sp_cdc_generate_wrapper_function
Sys.sp_cdc_change_job --修改job配置
Sys.sp_cdc_get_captured_columns --获取捕获列信息
Sys.sp_cdc_cleanup_change_table
Sys.sp_cdc_get_ddl_history
Sys.sp_cdc_disable_db --禁用当前数据的CDC,建议先禁用表,再禁用库
Sys.sp_cdc_help_change_data_capture
Sys.sp_cdc_disable_table --关闭表捕获
Sys.sp_cdc_help_jobs
Sys.sp_cdc_drop_job --删除Job
Sys.sp_cdc_scan
Sys.sp_cdc_enable_db --启用当前数据库CDC功能
Sys.sp_cdc_start_job --启动Job
Sys.sp_cdc_enable_table --开启表捕获
Sys.sp_cdc_stop_job --停止Job
相关函数:
Cdc.fn_cdc_get_all_changes_<capture_instance>Sys.fn_cdc_has_column_changed
Cdc.fn_cdc_get_net_changes_<capture_instance>Sys.fn_cdc_increment_lsn
Sys.fn_cdc_decrement_lsn
Sys.fn_cdc_is_bit_set
Sys.fn_cdc_get_column_ordinal
Sys.fn_cdc_map_lsn_to_time
Sys.fn_cdc_get_max_lsn
Sys.fn_cdc_map_time_to_lsn
Sys.fn_cdc_get_min_lsn
3. 操作案例
本节以microsoft发布的adventureworks数据库为例来演示如何配置和使用Sql Server的CDC功能,本文中使用的数据库为SQL Server 2008 R2, AdventureWorks数据库可在这里下载,然后恢复到sql server中来。在操作之前确保SQL Agent服务是启动状态,最好设置为自动启动。
3.1 对AdventureWorks启用CDC
USE AdventureWorks
GO
EXECUTE sys.sp_cdc_enable_db;
GO
说明:
sys.sp_cdc_enable_db
作用域: 整个目标库,包含元数据、DDL触发器、cdc架构和cdc用户。无法对系统数据库和分发数据库启用该功能。且执行者需要用sysadmin角色权限。
返回值: 0,成功; 1,失败。
如果执行过程中出现 错误: 返回的错误为15517: '无法作为数据库主体执行,因为主体"dbo" 不存在、无法模拟这种类型的主体,或您没有所需的权限。 原因为某些存储过程使用了具有WITH EXECUTE AS 的选项。使其在当前库具有了某个架构,但是当在别的地方执行时,但是该架构不存在。解决办法为(如果成功,请忽略):
USE AdventureWorks
GO
ALTER AUTHORIZATION ON DATABASE::[AdventureWorks] TO [sa]
GO
EXECUTE sys.sp_cdc_enable_db;
GO
经过检查,uspUpdateEmployeeHireInfo这个存储过程的确有:WITH EXECUTE AS CALLER,使用sa的原因是即使sa被禁用,sa还是存在的。所以不会报错。
3.2 检查AdventureWorks CDC是否开启
SELECT is_cdc_enabled,CASE WHEN is_cdc_enabled=0 THEN 'CDC OFF 'ELSE 'CDC ON'END as 描述
FROM sys.databases
WHERE NAME = 'AdventureWorks'
数据库CDC开启后,将在当前数据库下创建cdc用户,在架构下面也会增加cdc这个架构,因为CDC要求独占方式使用这两个架构,所以要单独创建。如果存在了非CDC功能创建的cdc用户、架构的话,则需要先删除该cdc命名的架构,才能开启。如下图所示:
当前数据库的系统表下,会创建一些CDC辅助的表,用于在CDC过程中,记录一些系统信息
3.3 对数据库AdventureWorks中的表开启CDC
使用db_owner角色的成员执行sys.sp_cdc_enable_table为每个需要跟踪的表创建捕获实例,sys.sp_cdc_enable_table的参数列表为:
sys.sp_cdc_enable_table
[ @source_schema = ] 'source_schema',
[ @source_name = ] 'source_name' ,
[ @role_name = ] 'role_name'
[,[ @capture_instance = ] 'capture_instance' ]
[,[ @supports_net_changes = ] supports_net_changes ]
[,[ @index_name = ] 'index_name' ]
[,[ @captured_column_list = ] 'captured_column_list' ]
[,[ @filegroup_name = ] 'filegroup_name' ]
[,[ @partition_switch = ] 'partition_switch' ]
说明:
a) 执行sys.sp_cdc_enable_table后,可以通过sys.tables目录视图中的is_tracked_by_cdc列来判断是否创建成功。
b) 默认情况下会对表的全部列做捕获,如果只需要对某些列做捕获,可以在@captured_column_list参数中指定这些列;
c) 如果要把更改表放到文件组里的话,最好创建单独的文件组(最起码与源表独立)。
d) 如果不想控制访问角色,则@role_name必须显式设置为null。如果指定了不存在的角色,执行该存储过程后,会自动创建指定的角色。
本节我们选择HumanResources.Department、Person.ADDRESS、Person.Person这三张表开启CDC功能。
USE AdventureWorks;
GO
--表HumanResources.Department
EXECUTE sys.sp_cdc_enable_table
@source_schema = N'HumanResources'
, @source_name = N'Department'
, @role_name = N'cdc_Admin'--可以自动创建
, @capture_instance=DEFAULT
GO
--表Person.ADDRESS
EXECUTE sys.sp_cdc_enable_table
@source_schema = N'Person'
, @source_name = N'ADDRESS'
, @role_name = N'cdc_Admin'--可以自动创建
, @capture_instance=DEFAULT
GO
--表Person.Person
EXECUTE sys.sp_cdc_enable_table
@source_schema = N'Person'
, @source_name = N'Person'
, @role_name = N'cdc_Admin'--可以自动创建
, @capture_instance=DEFAULT
GO
执行后,
1. 系统表中多出了三张cdc.{Schema}_{TableName}_CT的表,其中SchemaName为被捕获表的架构名称,TableName为被捕获的表名
2. 在SQL Agent下创建了两个Job, cdc.AdventureWorks_cleanup和cdc.AdventureWorks_capture
3. cdc_Admin角色被创建
3.4 验证三张表的CDC开启成功
SELECT name ,
is_tracked_by_cdc ,
CASE WHEN is_tracked_by_cdc = 0 THEN 'CDC ON'
ELSE 'CDC OFF'
END 描述
FROM sys.tables
WHERE OBJECT_ID IN( OBJECT_ID('HumanResources.Department'),
OBJECT_ID('Person.ADDRESS'),
OBJECT_ID('Person.Person') )
结果如下
3.5 CDC功能验证
3.5.1 查看HumanResources.Department变更之前的数据
3.5.2 新增数据
INSERT INTO HumanResources.Department
( Name ,
GroupName ,
ModifiedDate
)
SELECT Name + '' ,
GroupName + '' ,
GETDATE() ModifiedDate
FROM HumanResources.Department
操作之后的数据增加了16条,
查询CDC变更表cdc.HumanResources_Department_CT:
SELECT * FROM cdc.HumanResources_Department_CT
结果如下:
可以看到的确多了16条记录,注意__$operation的值
3.5.3 删除HumanResources.Department在3.5.3步中新增的数据
DELETE FROM HumanResources.Department WHERE DepartmentID>16
查询CDC数据
3.5.4 修改数据
UPDATE HumanResources.Department SET ModifiedDate=GETDATE()
结果如下
我们发现这次CDC表增加的数据增加了2倍,因为cdc.<capture_instance>_CT这样命名的表,是用于记录源表更改的表,对于insert/delete操作,会有对应的一行记录,而对于update,会有两行记录。
__$operation列:1 = 删除、2= 插入、3= 更新(旧值)、4= 更新(新值)
__$start_lsn列:由于更改是来源与数据库的事务日志,保存事务日志的开始序列号(LSN)
微软不建议直接查询这类表,建议使用cdc.fn_cdc_get_all_changes_<捕获实例> 和cdc.fn_cdc_get_net_changes_<capture_instance> 来查询。
使用示例如下:
USE AdventureWorks;
GO
SELECT DepartmentID,Name,GroupName,__$operation as operation_type,ModifiedDate
FROM
cdc.fn_cdc_get_net_changes_HumanResources_Department(sys.fn_cdc_get_min_lsn('HumanResources_Department'), sys.fn_cdc_get_max_lsn(), N'all with mask')
--最后一个参数说明如下:
--all
--返回的行和元数据列 $start_lsn 中应用该行所需的操作的最终更改的 LSN 和_ _$operation。 该列_ _$update_mask 始终为 NULL。
--all with mask
--返回的行和元数据列 $start_lsn 中应用该行所需的操作的最终更改的 LSN 和_ _$operation。 此外,当更新操作返回 (__$operation = 4) 的更新中修改过的捕获的列中返回的值中进行标记_ _$update_mask。
--all with merge
--返回对元数据列 $start_lsn 中的行所做的最终更改的 LSN。 该列_ _$operation 将是两个值之一: 1 表示删除,5 表示应用更改所需的操作是插入或更新。 该列_ _$update_mask 始终为 NULL。
结果如下:
4. 常用操作举例
4.1 查询已经开启的捕获实例
由于可能不记得或者不知道开启了什么表的捕获,所以可以使用以下语句来查找
USE AdventureWorks;
GO
--返回所有表的变更捕获配置信息
EXECUTE sys.sp_cdc_help_change_data_capture;
GO
4.2 查看对某个实例(即表)的哪些列做了捕获监控
USE AdventureWorks;
GO
--查看对某个实例(即表)的哪些列做了捕获监控
EXEC sys.sp_cdc_get_captured_columns
@capture_instance = 'HumanResources_Department' -- sysname
4.3 查看当前库所有的cdc Job
USE AdventureWorks;
GO
--查看当前库所有的cdc Job
SELECT * FROM msdb.dbo.cdc_jobs
4.4 查看当前配置使用sp_cdc_help_jobs
启用cdc之后会自动创建了两个作业,可以先使用sp_cdc_help_jobs来查看
EXECUTE sp_cdc_help_jobs
对于一个大型的OLTP系统,由于数据更改会非常频繁,变更表中的数据会非常多,如果存放过久(最久可以存放100年),那对数据库空间是非常大的挑战。此时可以调整上图中cdc.AdventureWorks_cleanup 中retention(单位:分钟),默认情况下,保留时间为3天.
4.5 修改作业配置
USE AdventureWorks;
GO
--显示原有配置:
EXEC sp_cdc_help_jobs
GO
--更改数据保留时间为分钟
EXECUTE sys.sp_cdc_change_job
@job_type = N'cleanup',
@retention=100
GO
--停用作业
EXEC sys.sp_cdc_stop_job N'cleanup' --job name 为NVARCHAR类型
GO
--启用作业
EXEC sys.sp_cdc_start_job N'cleanup' --job name 为NVARCHAR类型
GO
--再次查看
EXEC sp_cdc_help_jobs
GO
执行后,cleanup作业的运行时间间隔已经被修改成100分钟了.
注意:修改后要先停用(如果已经启用),再启用,才能生效
4.6 作业的其他操作
USE AdventureWorks;
GO
--停用作业
EXEC sys.sp_cdc_stop_jobN'cleanup'
GO
--启用作业
EXEC sys.sp_cdc_start_jobN'cleanup'
GO
--删除作业
EXEC sys.sp_cdc_drop_job@job_type = N'cleanup' -- nvarchar(20)
GO
--创建作业
EXEC sys.sp_cdc_add_job
@job_type = N'cleanup',
@start_job = 0,
@retention = 5760
GO
--查看作业
EXEC sys.sp_cdc_help_jobs
GO
4.7 DDL变更捕获
CDC除了捕获数据变更之外,还能捕获DDL操作的变化。
4.7.1 现在先来对HumanResources.Department 表修改一下,把name的长度加长
USE AdventureWorks;
GO
ALTER TABLE HumanResources.Department ALTER COLUMN Name NVARCHAR(120) ;
GO
4.7.2 查询ddl记录表
USE AdventureWorks;
GO
SELECT * FROM cdc.ddl_history
我们发现有一条schema的表更记录
4.8 使用CDC的函数来获取更改
4.8.1 使用cdc.fn_cdc_get_all_changes_HumanResources_Department 函数报告捕获实例HumanResources_Department 的当前所有可用更改
USE AdventureWorks;
GO
DECLARE @from_lsn binary(10) --开始事务序列号
DECLARE @to_lsn binary(10) --终止事务序列号
SET @from_lsn = sys.fn_cdc_get_min_lsn('HumanResources_Department')
SET @to_lsn = sys.fn_cdc_get_max_lsn()
SELECT *
FROM cdc.fn_cdc_get_all_changes_HumanResources_Department(@from_lsn, @to_lsn, N'all update old');
GO -----------------------------------------------------------------------------------------------------------------------------------------------------
USE AdventureWorks;
GO DECLARE @datetimeString as VARCHAR(100)
--当前日期的前一天0点时间
SELECT @datetimeString=DATENAME(year,GETDATE())+'-'+DATENAME(month,GETDATE())+'-'+DATENAME(DAY,DATEADD(DAY,-1,GETDATE()))+' 00:00:00'
DECLARE @begin_time as DATETIME
set @begin_time=CONVERT(DATETIME,@datetimeString) DECLARE @end_time as DATETIME
--当前时间
SET @end_time = GETDATE(); DECLARE @from_lsn BINARY(10), @to_lsn BINARY(10);
-- Map the time interval to a change data capture query range.
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time); SELECT
[__$operation] as operation_type
,[DepartmentID]
,[Name]
,[GroupName]
,[ModifiedDate]
FROM cdc.fn_cdc_get_net_changes_HumanResources_Department(@from_lsn, @to_lsn, N'all with merge');
4.8.2 获取某个时间段的更改信息
先根据日志序列号(logsequence number ,LSN)来获取跟踪变更数据:Sys.fn_cdc_map_time_to_lsn获取变更范围内的最大、最小LSN值。可以使用:
a) Smallest greater than;
b) smallest greater than orequal;
c) largest less than;
d) largest less than or equal.
如查询某个时间段插入的数据:
USE AdventureWorks;
GO
--插入数据
INSERT INTO HumanResources.Department(name,GroupName,ModifiedDate)
VALUES('test','abc',GETDATE())
INSERT INTO HumanResources.Department(name,GroupName,ModifiedDate)
VALUES('test1','abc1',GETDATE())
GO
--检查数据
DECLARE @bglsn VARBINARY(10)=sys.fn_cdc_map_time_to_lsn('smallest greater than or equal','2018-12-29 14:32:00.997')
DECLARE @edlsn VARBINARY(10)=sys.fn_cdc_map_time_to_lsn('largest less than or equal',GETDATE())
SELECT DepartmentID,GroupName,Name
FROM cdc.HumanResources_Department_CT
WHERE [__$operation]=2 AND [__$start_lsn] BETWEEN @bglsn AND @edlsn
结果如下:
4.8.3 sys.fn_cdc_map_lsn_to_time 查询变更时间
USE AdventureWorks;
GO
SELECT [__$operation] ,
CASE [__$operation]
WHEN 1 THEN 'DELETE'
WHEN 2 THEN 'INSERT'
WHEN 3 THEN 'BEFORE UPDATE'
WHEN 4 THEN 'AFTER UPDATE'
END [类型],
sys.fn_cdc_map_lsn_to_time([__$start_lsn]) [更改时间] ,
name ,
DepartmentID ,
GroupName ,
ModifiedDate
FROM cdc.HumanResources_Department_CT
4.8.4 获取LSN边界
USE AdventureWorks;
GO
SELECT sys.fn_cdc_get_max_lsn() AS [数据库级别的最大LSN],
sys.fn_cdc_get_min_lsn('cdc.HumanResources_Department_CT') AS [捕获实例的lsn]
这两个值可以用于上面提到的函数里面用于筛选数据时使用。
4.8.5 更新DepartmentID=33的记录
update HumanResources.Department set name ='changed for test record 33' where DepartmentID=33;
可以利用这个特性很方便地处理ETL过程中的缓慢变化维.
4.8.5 返回某个表的变更捕获配置信息
EXEC sys.sp_cdc_help_change_data_capture '{schema}', '{table_name}'
例如查看HumanResources.Department的配置信息:
USE AdventureWorks;
GO
EXEC sys.sp_cdc_help_change_data_capture 'HumanResources', 'Department'
执行结果
4.8.6 获取当前库中所有变更捕获配置信息
USE AdventureWorks;
GO
--返回所有表的变更捕获配置信息
EXECUTE sys.sp_cdc_help_change_data_capture;
4.8.7 查看对某个表的哪些列做了捕获监控,使用上面返回的capture_instance列值
USE AdventureWorks;
GO
--查看对某个表的哪些列做了捕获监控,使用上面返回的capture_instance列值
EXEC sys.sp_cdc_get_captured_columns @capture_instance = 'HumanResources_Department'
由于在开启该表时sys.sp_cdc_enable_table 的参数:@captured_column_list参数没有指定,所以dbo.Department表的所有字段都进行监控了,如果你只关心某些字段,强烈建议在创建捕获的时候设置这个属性
4.8.8
declare @datetimeString as varchar(100)
Select @datetimeString=Datename(year,GetDate())+'-'+Datename(month,GetDate())+'-'+Datename(day,dateadd(day,-1,GetDate()))+' 00:00:00'
declare @begin_time as datetime
set @begin_time=convert(datetime,@datetimeString)
--select @begin_time declare @end_time as datetime
SET @end_time = getdate(); declare @from_lsn binary(10), @to_lsn binary(10);
-- Map the time interval to a change data capture query range.
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time);
--select @from_lsn,@to_lsn SELECT FeeGUID,TradeGUID,Sequence,Flag,lastDate,ItemType,ItemName,Amount,Bz,ExRate,RmbAmount,RmbYe,JmLateFee,Remark,Ye,IsChg,OutAmount,
OutRmbAmount,PayEvent,PayLagQty,PayLagUnit,DsAmount,RmbDsAmount,IsBcK,signguid,TabTimeStamp,IsAjSk,VATRate,IsYgz,YjsjAmount,__$operation as operation_type,GETDATE() as last_update_timestamp
FROM cdc.fn_cdc_get_net_changes_dbo_s_Fee(@from_lsn, @to_lsn, N'all with merge'); --declare @min_lsn as binary(10)
--declare @max_lsn as binary(10)
--set @min_lsn =sys.fn_cdc_get_min_lsn('dbo_s_Fee')
--set @max_lsn=sys.fn_cdc_get_max_lsn()
--SELECT FeeGUID,TradeGUID,Sequence,Flag,lastDate,ItemType,ItemName,Amount,Bz,ExRate,RmbAmount,RmbYe,JmLateFee,Remark,Ye,IsChg,OutAmount,
--OutRmbAmount,PayEvent,PayLagQty,PayLagUnit,DsAmount,RmbDsAmount,IsBcK,signguid,TabTimeStamp,IsAjSk,VATRate,IsYgz,YjsjAmount,__$operation as operation_type,GETDATE() as last_update_timestamp
--FROM cdc.fn_cdc_get_net_changes_dbo_s_Fee(@min_lsn, @max_lsn, N'all with merge');
5. 参考文献