SQL Server数据库CDC数据同步配置

SQL Server 开启CDC数据同步

1、 数据库需要开启代理服务。

 SQL Server数据库CDC数据同步配置

 

 

开启方式:

       a). 点击开始菜单àSQL Serverà配置管理工具àSQLserver服务àSQLserver代理à(右键)启动

 SQL Server数据库CDC数据同步配置SQL Server数据库CDC数据同步配置

 

 

 

b). 打开电脑服务,找到SQLserver 代理,点击左侧状态

 SQL Server数据库CDC数据同步配置

 

 

2、 数据库配置

a).首先查看数据库是否已经开启CDC服务

SELECT name,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1;

返回已经开启CDC的数据库,如果为空,则所有的库都没有开启CDC服务,转到步骤b,对数据库开启CDC。如果有结果,则转到步骤c。

b).对数据库开启CDC服务

USE test;           -- 切换数据库

EXECUTE sys.sp_cdc_enable_db; -- 开启CDC功能

检查是否开启成功:

SELECT

is_cdc_enabled,

CASE WHEN is_cdc_enabled=0 THEN ‘CDC功能禁用‘ ELSE ‘CDC功能启用‘ END 描述

FROM    sys.databases

WHERE   NAME = ‘test’ –- 你的数据库名称

创建成功后,将自动添加CDC用户和CDC架构。

在用户和架构下面可以看到cdc用户和cdc架构

 

 

c.查看当前已经开启CDC的数据表。

SELECT name,is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1;

d.开启表CDC

          

sys.sp_cdc_enable_table

    [ @source_schema = ] ‘source_schema‘,

    [ @source_name = ] ‘source_name‘ ,

    [ @role_name = ] ‘role_name‘

    [,[ @capture_instance = ] ‘capture_instance‘ ]

    [,[ @supports_net_changes = ] supports_net_changes ]

    [,[ @index_name = ] ‘index_name‘ ]

    [,[ @captured_column_list = ] ‘captured_column_list‘ ]

    [,[ @filegroup_name = ] ‘filegroup_name‘ ]

  [,[ @partition_switch = ] ‘partition_switch‘ ]

示例:

对‘USRALMHS‘表开启变更捕获

EXEC sys.sp_cdc_enable_table

@source_schema= ‘dbo‘,      --源表架构

@source_name = ‘USRALMHS‘,  --源表

@role_name = ‘CDC_Role‘     --角色(将自动创建)

GO

--如果不想控制访问角色,则@role_name必须显式设置为null。

查询是否成功

SELECT  name ,

        is_tracked_by_cdc ,

        CASE WHEN is_tracked_by_cdc = 0 THEN ‘CDC功能禁用‘

             ELSE ‘CDC功能启用‘

        END 描述

FROM    sys.tables

WHERE   OBJECT_ID = OBJECT_ID(‘dbo. USRALMHS ‘)

对表开启成功后,可以查看数据库,在数据库系统表下增加了很多表。

在SQLserver 代理中多了两个作业

        在可编程性-》函数-》表值函数里,也多了两个函数

 

  SQL Server数据库CDC数据同步配置SQL Server数据库CDC数据同步配置SQL Server数据库CDC数据同步配置

 

 

 

 

3、 测试

a)      向表中插入数据

insert into test.dbo.USRALMHS  select top 1000 * from alarm.dbo.USRALMHS_copy3
            在DBO_USRALMHS_CT中查看:
     会有同样的1000 条数据,唯一不同的是在DBO_USRALMHS_CT中会多几个字段,分别代表不同的含义,其中最主要的是 __$operation 代表含义 1 删除、2插入、3更新前的内容、4更新后的内容 @bglsn 开始时间的时间戳  @edlsn 结束时间的时间戳

SQL Server数据库CDC数据同步配置

 

 

b)      测试更新和删除操作(生成数据的__$operation 不同)

省略

4、 分析(系统自带数据库)

a)      分析存储过程

---查询当前作业配置

SELECT * FROM MSDB.dbo.cdc_jobs

--或者使用

USE AdventureWorks2008R2;

GO

EXEC sys.sp_cdc_help_jobs;

GO

1.sys.sp_cdc_add_job

在当前数据库中创建变更数据捕获清理或捕获作业

 

    1.创建捕获作业

    USE AdventureWorks2008R2;

    GO

    EXEC sys.sp_cdc_add_job

         @job_type = N‘capture‘;

    GO

    2.创建清理作业

    ---创建清理作业,作业连续运行,更改数据行将在更改表中保留2880分钟,清除时使用一条语句最多删除4000条记录

    USE AdventureWorks2008R2;

    GO

    EXEC sys.sp_cdc_add_job

         @job_type = N‘cleanup‘

         ,@start_job=1

         ,@retention=2880

         ,@threshold =4000

2.sys.sp_cdc_change_job

修改当前数据库中变更数据捕获清除或捕获作业的配置

--仅在使用 sp_cdc_stop_job 停止作业并使用 sp_cdc_start_job 重新启动该作业后,对该作业所做的更改才会生效

 

    1.更改捕获作业

    --将每个循环扫描最多处理的事务数更改为200,为了从日志中提取所有行而要执行的最大扫描循环50次

    USE AdventureWorks2008R2;

    GO

    EXECUTE sys.sp_cdc_change_job

        @job_type = N‘capture‘,

        @maxtrans = 200,

        @maxscans = 50;

       

    GO

    2.更改清除作业,将记录保留时间更改为3440分钟

    USE AdventureWorks2008R2;

    GO

    EXECUTE sys.sp_cdc_change_job

        @job_type = N‘cleanup‘,

        @retention = 3440;

    GO

3.sys.sp_cdc_cleanup_change_table

根据指定的 low_water_mark 值从当前数据库的更改表中删除行,重置更改表中的最小 __$start_lsn,并删除小于该值的数据.

将同时清除cdc.HR_Department_CT,cdc.lsn_time_mapping表的记录

 

    USE AdventureWorks2008R2;

    GO

    EXEC sys.sp_cdc_cleanup_change_table

    @capture_instance =N‘HR_Department‘,

    @low_water_mark=0x0000037D000000D30008,

    @threshold=2000;

   

    SELECT sys.fn_cdc_increment_lsn(sys.fn_cdc_get_max_lsn())

4.sys.sp_cdc_drop_job

从 msdb 中删除当前数据库的变更数据捕获清除或捕获作业。

 

--下例删除 AdventureWorks2008R2 数据库的清除作业和捕获作业

    USE AdventureWorks2008R2;

    GO

    EXEC sys.sp_cdc_drop_job @job_type = N‘cleanup‘;

    USE AdventureWorks2008R2;

    GO

    EXEC sys.sp_cdc_drop_job @job_type = N‘capture‘;

 

5.sys.sp_cdc_disable_db   

对当前数据库禁用变更数据捕获

 

禁用当前对数据库中的所有表启用的变更数据捕获。与变更数据捕获相关的所有系统对象(如更改表、作业、存储过程和函数)都将被删除。sys.databases 目录视图中的数据库条目的 is_cdc_enabled 列设置为 0。

如果在禁用变更数据捕获时为数据库定义了很多捕获实例,则长时间运行事务可能导致 sys.sp_cdc_disable_db 的执行失败。通过在运行 sys.sp_cdc_disable_db 之前使用 sys.sp_cdc_disable_table 禁用单个捕获实例,可以避免此问题。

    USE AdventureWorks2008R2;

    GO

    EXECUTE sys.sp_cdc_disable_db;

    GO

6.sys.sp_cdc_disable_table

对当前数据库中指定的源表和捕获实例禁用变更数据捕获

删除与指定的源表和捕获实例相关联的变更数据捕获更改表和系统函数。它会删除任何与来自变更数据捕获系统表的指定捕获实例相关联的行,并将 sys.tables 目录视图中的表项的 is_tracked_by_cdc 列设置为 0。

 

---下例对 HumanResources.Department 表禁用了变更数据捕获

    USE AdventureWorks2008R2;

    GO

    EXEC sys.sp_cdc_disable_table

        @source_schema = N‘HumanResources‘

      , @source_name = N‘Department‘

      , @capture_instance = N‘HR_Department‘ ---这里是定义的实例名称,在一开始创建捕获的时候创建的,这里也可以制定ALL(禁用表HumanResources.Department的所有捕获),

 

7.sys.sp_cdc_enable_db

对当前数据库启用变更数据捕获。必须先对数据库执行此过程,然后才能对该数据库中的任何表启用变更数据捕获。变更数据捕获可记录应用到所启用的表中的插入、更新和删除活动,同时采用易于使用的关系格式提供变更详细信息。此操作将为已修改的行捕获反映了所跟踪源表列结构的列信息,同时还捕获将更改应用到目标环境所需的元数据。

将创建以全数据库为作用域的变更数据捕获对象,包括元数据表和 DDL 触发器。它还会创建 cdc 架构和 cdc 数据库用户,并将 sys.databases 目录视图中的数据库条目的 is_cdc_enabled 列设置为 1。

    USE AdventureWorks2008R2;

    GO

    EXECUTE sys.sp_cdc_enable_db;

    GO

 

8.sys.sp_cdc_enable_table

--对需要进行跟踪的表启动CDC,sys.sp_cdc_enable_table

/*

为当前数据库中指定的源表启用变更数据捕获。对表启用变更数据捕获时,应用于此表的每个数据操纵语言 (DML) 操作的记录都将写入事务日志中。

变更数据捕获进程将从日志中检索此信息,并将其写入可通过使用一组函数访问的更改表中。

*/

sys.sp_cdc_enable_table

    [ @source_schema = ] ‘source_schema‘, ---表所属的架构名

    [ @source_name = ] ‘source_name‘ ,----表名

    [ @role_name = ] ‘role_name‘---是用于控制更改数据访问的数据库角色的名称。

    [,[ @capture_instance = ] ‘capture_instance‘ ]--是用于命名特定于实例的变更数据捕获对象的捕获实例的名称

    [,[ @supports_net_changes = ] supports_net_changes ]---指示是否对此捕获实例启用净更改查询支持

    [,[ @index_name = ] ‘index_name‘ ]--用于唯一标识源表中的行的唯一索引的名称。index_name 为 sysname,并且可以为 NULL。如果指定,则 index_name 必须是源表的唯一有效索引。如果指定 index_name,则标识的索引列优先于任何定义的主键列,就像表的唯一行标识符一样。

    [,[ @captured_column_list = ] ‘captured_column_list‘ ]--需要对哪些列进行捕获。captured_column_list 的数据类型为 nvarchar(max),并且可以为 NULL。如果为 NULL,则所有列都将包括在更改表中。

    [,[ @filegroup_name = ] ‘filegroup_name‘ ]--是要用于为捕获实例创建的更改表的文件组。

  [,[ @partition_switch = ] ‘partition_switch‘ ]--指示是否可以对启用了变更数据捕获的表执行 ALTER TABLE 的 SWITCH PARTITION 命令。allow_partition_switch 为 bit,默认值为 1。

 

9.sp_cdc_generate_wrapper_function

 生成用于为 SQL Server 中可用的变更数据捕获查询函数创建包装函数的脚本

     EXEC sys.sp_cdc_generate_wrapper_function

 

 

10. sys.sp_cdc_help_change_data_capture

返回当前数据库中为变更数据捕获启用的每个表的变更数据捕获配置。最多可为每个源表返回两行,为每个捕获实例返回一行。

---返回制定表的捕获信息

    USE AdventureWorks2008R2;

    GO

    EXECUTE sys.sp_cdc_help_change_data_capture

        @source_schema = N‘HumanResources‘, --架构名

        @source_name = N‘Department‘;--表名

    GO

    --返回所有表的捕获信息

    USE AdventureWorks2008R2;

    GO

    EXECUTE sys.sp_cdc_help_change_data_capture

    

11.sys.sp_cdc_get_captured_columns

返回指定捕获实例所跟踪的捕获源列的变更数据捕获元数据信息。

    USE AdventureWorks2008R2;

    GO

    EXECUTE sys.sp_cdc_get_captured_columns

        @capture_instance = N‘HR_Department‘;

    GO

 

12.sys.sp_cdc_get_ddl_history

返回自对指定的捕获实例启用变更数据捕获后与该捕获实例关联的数据定义语言 (DDL) 更改历史记录。

与查询表是一样的结果

    SELECT * FROM cdc.ddl_history

 

    USE AdventureWorks2008R2;

    GO

    EXECUTE sys.sp_cdc_get_ddl_history

    @capture_instance = N‘HR_Department‘;

 

13.sp_cdc_help_jobs

报告关于当前数据库中所有变更数据捕获清除或捕获作业的信息。因为一个数据库只会在第一个表创建捕获的时候创建作业

所以这里只需要在当前库执行就可以。

    sys.sp_cdc_help_jobs

    USE AdventureWorks2008R2;

    GO

    EXEC sys.sp_cdc_help_jobs;

    GO

14.sp_cdc_scan

执行变更数据捕获日志扫描操作,需要进行捕获的时候使用,默认情况下5会自动进行捕获。

如果变更数据捕获正在使用 SQL Server 代理捕获作业,则 sys.sp_MScdc_capture_job 将内部调用 sys.sp_cdc_scan。如果变更数据捕获日志扫描操作已经处于活动状态,或数据库启用了事务复制,则无法显式执行此过程。此存储过程应当由需要自定义自动配置的捕获作业的行为的管理员使用。

    USE AdventureWorks2008R2;

    GO

    EXEC sp_cdc_scan

 

15.sys.sp_cdc_start_job,

启动和停止当前数据库的变更数据捕获清除或捕获作业。

 

    ---启动清除作业

    USE AdventureWorks2008R2;

    GO

    EXEC sys.sp_cdc_start_job @job_type = N‘cleanup‘;

 

    ---停止捕获作业

    USE AdventureWorks2008R2;

    GO

    EXEC sys.sp_cdc_stop_job @job_type = N‘capture‘;

    GO

 

    USE AdventureWorks2008R2;

    GO

    EXEC sys.sp_cdc_stop_job @job_type = N‘cleanup‘;

    GO

 

b)      分析函数

1.fn_cdc_get_all_changes_capture_instance

针对在指定日志序列号 (LSN) 范围内应用到源表的每项更改均返回一行

    USE AdventureWorks2008R2;

    GO

 

    DECLARE @from_lsn binary(10), @to_lsn binary(10)

    SET @from_lsn =

       sys.fn_cdc_get_min_lsn(‘HR_Department‘)

    SET @to_lsn   = sys.fn_cdc_get_max_lsn()

    SELECT * FROM cdc.fn_cdc_get_all_changes_HR_Department

      (@from_lsn, @to_lsn, N‘all‘);

    GO

2.fn_cdc_get_net_changes_capture_instance

针对指定 LSN 范围内每个已更改的源行返回一个净更改行。也就是说,如果在 LSN 范围内源行具有多项更改,

则该函数将返回反映该行最终内容的单一行。例如,如果事务在源表中插入一行,并且 LSN 范围内的后续事务更新了该行中的一个或多个列,

则该函数将只返回一行,其中包含多个更新的列值。

如果值最后是删除操作,则不返回该LSN的值

    USE AdventureWorks2008R2;

    GO

    DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10);

    -- Obtain the beginning of the time interval.

    SET @begin_time = GETDATE() -1;

    -- DML statements to produce changes in the HumanResources.Department table.

    INSERT INTO HumanResources.Department (Name, GroupName)

    VALUES (N‘MyDept‘, N‘MyNewGroup‘);

 

    UPDATE HumanResources.Department

    SET GroupName = N‘Resource Control‘

    WHERE GroupName = N‘Inventory Management‘;

 

    DELETE FROM HumanResources.Department

    WHERE Name = N‘MyDept‘;

 

    -- Obtain the end of the time interval.

    SET @end_time = GETDATE();

    -- Map the time interval to a change data capture query range.

    SET @from_lsn = sys.fn_cdc_map_time_to_lsn(‘smallest greater than or equal‘, @begin_time);

    SET @to_lsn = sys.fn_cdc_map_time_to_lsn(‘largest less than or equal‘, @end_time);

 

    -- Return the net changes occurring within the query window.

    SELECT * FROM cdc.fn_cdc_get_net_changes_HR_Department(@from_lsn, @to_lsn, ‘all‘);

   

3.sys.fn_cdc_decrement_lsn

根据指定的 LSN 返回序列中的上一个日志序列号 (LSN)

    Use AdventureWorks2008R2;

    GO

    SELECT sys.fn_cdc_decrement_lsn(sys.fn_cdc_get_max_lsn())

 

下例在一个返回 LSN 值小于最大 LSN 值的更改数据行的查询中,使用 sys.fn_cdc_decrement_lsn 来设置 LSN 上限。

 

    Use AdventureWorks2008R2;

    GO

    DECLARE @from_lsn binary(10), @to_lsn binary(10);

    SET @from_lsn = sys.fn_cdc_get_min_lsn(‘HR_Department‘);

    SET @to_lsn = sys.fn_cdc_decrement_lsn(sys.fn_cdc_get_max_lsn());

    SELECT * FROM cdc.fn_cdc_get_net_changes_HR_Department( @from_lsn, @to_lsn, ‘all‘);

    GO

 

4.sys.fn_cdc_increment_lsn

根据指定的 LSN 返回序列中的下一个日志序列号 (LSN)。

此函数返回的 LSN 值始终大于指定的值,并且不存在介于这两个值之间的 LSN 值。

若要系统地查询随时间变化的更改数据流,可以定期重复调用该查询函数,每次调用时指定一个新的查询间隔来限定查询中返回的更改的范围。为帮助确保不丢失数据,通常使用前一个查询的上限来生成后一个查询的下限。由于查询间隔是一个闭区间,因此新的下限必须大于前一个上限,但要足够小,以确保不存在 LSN 值介于此值与旧上限之间的更改。sys.fn_cdc_increment_lsn 函数就是用来获取此值的。

    Use AdventureWorks2008R2;

    GO

    SELECT sys.fn_cdc_increment_lsn(sys.fn_cdc_get_max_lsn())

 

5.sys.fn_cdc_get_column_ordinal

返回实例制定列的列序号。

 

    Use AdventureWorks2008R2;

    GO

    SELECT sys.fn_cdc_get_column_ordinal ( ‘HR_Department‘,‘NAME‘);

 

 

6.sys.fn_cdc_get_max_lsn

返回 cdc.lsn_time_mapping 系统表的 start_lsn 列中的最大日志序列号 (LSN)。您可以使用此函数为任何捕获实例返回变更数据捕获时间线的高端点

    USE AdventureWorks2008R2;

    GO

    SELECT sys.fn_cdc_get_max_lsn()AS max_lsn;

 

    SELECT  sys.fn_cdc_get_min_lsn(N‘HR_Department‘);

 

 

    USE AdventureWorks2008R2;

    GO

    DECLARE @from_lsn binary(10), @to_lsn binary(10);

    SET @from_lsn = sys.fn_cdc_get_min_lsn(N‘HR_Department‘);

    SET @to_lsn = sys.fn_cdc_get_max_lsn();

    SELECT * FROM cdc.fn_cdc_get_net_changes_HR_Department(@from_lsn, @to_lsn, ‘all‘);

    GO

 

7.sys.fn_cdc_get_min_lsn

    USE AdventureWorks2008R2;

    GO

    SELECT sys.fn_cdc_get_min_lsn (‘HR_Department‘)AS min_lsn;---查询制定的实例名的最小LSN

 

8.sys.fn_cdc_has_column_changed ( ‘capture_instance‘,‘column_name‘ , update_mask )

标识指定的更新掩码是否指示已更新关联的更改行中的指定列。

    USE AdventureWorks2008R2;

    GO

    SELECT sys.fn_cdc_has_column_changed (‘HR_Department‘,‘name‘ , 2)

 

 

9.sys.fn_cdc_is_bit_set

指示捕获的列是否已更新,采用的方法是检查是否在提供的位掩码内设置了其序号位置。

 

    USE AdventureWorks2008R2;

    GO

    DECLARE @from_lsn binary(10), @to_lsn binary(10), @GroupNm_ordinal int;

    SET @from_lsn = sys.fn_cdc_get_min_lsn(‘HR_Department‘);

    SET @to_lsn = sys.fn_cdc_get_max_lsn();

    SET @GroupNm_ordinal = sys.fn_cdc_get_column_ordinal(‘HR_Department‘,‘GroupName‘);

    SELECT sys.fn_cdc_is_bit_set(@GroupNm_ordinal,__$update_mask) as ‘IsGroupNmUpdated‘, *

    FROM cdc.fn_cdc_get_all_changes_HR_Department( @from_lsn, @to_lsn, ‘all‘)

    WHERE __$operation = 4;

    GO

 

10.sys.fn_cdc_map_lsn_to_time

为指定的日志序列号 (LSN) 返回 cdc.lsn_time_mapping 系统表的 tran_end_time 列中的日期和时间值。

您可以使用此函数系统地将 LSN 范围映射到更改表中的日期范围

    SELECT sys.fn_cdc_map_lsn_to_time(sys.fn_cdc_get_max_lsn());

 

11.sys.fn_cdc_map_time_lsn

下面的示例使用 sys.fn_cdc_map_time_lsn 函数来确定在 cdc.lsn_time_mapping 表中是否有 tran_end_time

值大于或等于指定时间的行。例如,可以用此查询来确定捕获进程是否已处理完截至前指定时间提交的更改

    DECLARE @extraction_time datetime, @lsn binary(10);

    SET @extraction_time = GETDATE();

    SELECT @lsn = sys.fn_cdc_map_time_to_lsn (‘smallest greater than or equal‘, @extraction_time);

    IF @lsn IS NOT NULL

    BEGIN

    print ‘...‘

    END

 

    DECLARE @begin_time datetime, @end_time datetime, @begin_lsn binary(10), @end_lsn binary(10);

    SET @begin_time = ‘2015-04-07 18:00:00.000‘;

    SET @end_time = ‘2015-04-08 18:00:00.000‘;

    SELECT @begin_lsn = sys.fn_cdc_map_time_to_lsn(‘smallest greater than‘, @begin_time);

    SELECT @end_lsn = sys.fn_cdc_map_time_to_lsn(‘largest less than or equal‘, @end_time);

    SELECT * FROM cdc.fn_cdc_get_net_changes_HR_Department(@begin_lsn, @end_lsn, ‘all ‘);

c)      分析系统视图

1.sys.dm_cdc_log_scan_sessions

针对当前数据库中的每个日志扫描会话返回一行。返回的最后一行表示当前会话。您可以使用此视图返回有关当前日志扫描会话的状态信息,

或有关自 SQL Server 实例上次启动以来所有会话的聚合信息。

    USE AdventureWorks2008R2;

    SELECT *

    FROM sys.dm_cdc_log_scan_sessions

--可以观察empty_scan_count字段的值可以发现它的变化,5秒增加一次,和前面配置的日志扫描作业的频率是一样的

    USE AdventureWorks2008R2;

    GO

    print getdate()

    SELECT empty_scan_count

    FROM sys.dm_cdc_log_scan_sessions

    WHERE session_id = (SELECT MAX(b.session_id) from sys.dm_cdc_log_scan_sessions AS b)

 

    waitfor DELAY ‘00:01‘

 

    print getdate()

    SELECT empty_scan_count

    FROM sys.dm_cdc_log_scan_sessions

    WHERE session_id = (SELECT MAX(b.session_id) from sys.dm_cdc_log_scan_sessions AS b)

 

2.sys.dm_cdc_errors

为变更数据捕获日志扫描会话中遇到的每个错误返回一行。

    USE AdventureWorks2008R2;

    GO

    SELECT *

    FROM sys.dm_cdc_errors

SQL Server数据库CDC数据同步配置

上一篇:ASP.NET MVC Model验证(二)


下一篇:省市县三级联动数据导入数据库