SSIS结合BCP及SQL Server作业实现定时将数据导出打包实现数据同步

首先这个流程要实现的功能大致是:

有两台服务器,一台是对外网开发的,一台是内网的。那么很明显数据交互都是外网服务器在做,而这个流程要做的就是要将外网上面的数据定时同步到内网中。

我们依对其中某张表的操作为例子,通过在一张基表(Staff)上面建立触发器,用来监督Staff表的增,删,改

由触发器将修改的内容的主键ID,表名称,以及所执行的动作(增,删,改),和修改内容的时间。记录到一张日志表中(ActionLog)

然后再由作业调用SSIS及BCP将修改的内容记录到一个txt文件中,随后对txt文件进行打包。

这样当其他的程序或者作业再去调用这个打包后的文件,并把它进行处理,就可以实现数据的同步工作了。

SSIS结合BCP及SQL Server作业实现定时将数据导出打包实现数据同步

首先要新建一个SSIS项目,通过"文件 - 新建 - 项目 - Integration Services 项目",输入名称和项目位置之后,创建SSIS项目

创建完成之后,默认会进入到控制流的选项卡界面,这时候先不要着急走流程,首先创建几个用于流程的变量,在菜单栏上的"SSIS - 变量"点击创建

(以下变量或变量赋予的默认值,根据自己项目中的实际情况决定,仅供参考)

Flag(存放压缩包文件路径,String),

OutputPath(压缩包文件路径,String),

prefix(压缩包文件前缀,String,值:ylstf_),

TableName(需要操作的表名称,String,值:Staff),

TopNum(每次需要操作多少条数据,Int32,值:20),

TxtDataPath(存放TXT文件的路径,String,值:D:\Data_sync\YlStf\Temp\)

接下来我们所有的操作都会在SSIS设计中的控制流选项卡下进行操作

首先从左边“工具栏”的“控制流项”中,拖拽脚本任务到右边的控制流选项卡下,并将该脚本任务的名字改成“调用存储过程BCP出数据”,双击脚本进入脚本任务编辑器

在“脚本”选项的ReadOnlyVariables中选择prefix,TableName,TopNum,TxtDataPath 变量,ReadWriteVariables中选择Flag,OutputPath变量。

然后点击“编辑脚本”进入代码编辑

public void Main()
{
DateTime dtime = DateTime.Now;
SqlConnection con = new SqlConnection("你的数据库链接地址");
SqlCommand cmd = new SqlCommand("[Data_SYNC_Stf]", con); string dataPackPath = @"D:\Data_sync\Upload\";
string DataPath = Dts.Variables["TxtDataPath"].Value.ToString(); cmd.Parameters.Add("@table", SqlDbType.VarChar, ).Value = Dts.Variables["TableName"].Value;
cmd.Parameters.Add("@mInitialFilePath", SqlDbType.VarChar, ).Value = DataPath;
cmd.Parameters.Add("@mUploadFilePath", SqlDbType.VarChar, ).Value = dataPackPath;
cmd.Parameters.Add("@dateflag", SqlDbType.DateTime).Value = dtime;
cmd.Parameters.Add("@prefix", SqlDbType.VarChar, ).Value = Dts.Variables["prefix"].Value;
cmd.Parameters.Add("@TopNum", SqlDbType.Int).Value = ;
cmd.Parameters.Add("@DataFullFileName", SqlDbType.VarChar, ).Value = "";
cmd.Parameters["@DataFullFileName"].Direction = ParameterDirection.Output;
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandTimeout = ;
try
{
con.Open();
cmd.ExecuteNonQuery();
Dts.Variables["Flag"].Value = cmd.Parameters["@DataFullFileName"].Value.ToString(); System.IO.FileInfo fileinfo = new System.IO.FileInfo(DataPath + Dts.Variables["TableName"].Value + ".txt");
if (fileinfo.Length < )
{
throw new Exception("Empty files!!!!!!!!!!!!!!!!!!!!!!!");
} Dts.Variables["OutputPath"].Value = " a -ap -df " + Dts.Variables["Flag"].Value + " " + DataPath;
Dts.TaskResult = (int)ScriptResults.Success;
}
catch (Exception ex)
{
Dts.Events.FireError(, "Event Snippet", ex.Message, "", );
Dts.TaskResult = (int)ScriptResults.Failure;
}
finally { if (con.State == ConnectionState.Open) { con.Close(); } } }

代码中的Dts是 Microsoft.SqlServer.Dts.Runtime命名空间下的一个属性,Dts.Variables["变量名"].value 是获取SSIS中定义的变量值 Dts.Variables["变量名"].Value =“” 则是为变量赋值。
如果你不能找到Dts这个属性,那么请在引用中引入:Microsoft.SqlServer.ManagedDTS,Microsoft.SqlServer.ScriptTask 这两个dll文件

上面的代码中调用了一个存储过程[Data_SYNC_Stf],我们的BCP工作也就是在这里面完成的

Create PROCEDURE [dbo].[Data_SYNC_Stf]
@table VARCHAR() ,
@mInitialFilePath VARCHAR() , --数据包路径
@mUploadFilePath VARCHAR() , --上传的压缩文件路径
@DateFlag DATETIME , --打包日期
@TopNum INT, --每次多少条
@prefix varchar() ,--文件名前缀
@DataFullFileName VARCHAR() OUTPUT --要上传的数据包完整路径 AS
BEGIN DECLARE @__error_message NVARCHAR() --错误信息
DECLARE @err INT --错误数记录
BEGIN TRY
SET @DataFullFileName=@mUploadFilePath + @prefix + replace(replace(replace(convert(varchar,@DateFlag,),'-',''),' ',''),':','')+'.zip'
--SELECT * FROM actionlog WITH (HOLDLOCK);
--检查ActionLogTempStorage表是否还有没传完的。 没有则新插
DECLARE @count INT
SELECT @count = COUNT()
FROM dbo.ActionLogTempStorage
WHERE objTable = @table --AND state = IF @count =
BEGIN
INSERT INTO dbo.ActionLogTempStorage
( objID ,
objtable ,
objaction ,
createDate
)
SELECT TOP objid ,
objtable ,
objaction ,
@DateFlag
FROM dbo.actionlog
WHERE objtable = @table DELETE FROM dbo.actionlog
WHERE objID IN (
SELECT objID
FROM dbo.ActionLogTempStorage) END
--SELECT * FROM actionlog WITH (noLOCK); --开始导出各表操作=======================================
DECLARE @sql VARCHAR() --打包ActionTemp中需要插入的表记录 SET @sql = 'bcp "SELECT top '''+@TopNum +''' a.* FROM dbo.Staff AS a RIGHT JOIN dbo.ActionLogTempStorage AS b on a.ID=b.[objID] where b.objTable='''+@table+''' and b.state=0 and (b.objAction=1 or b.objAction=2)" queryout ' + @mInitialFilePath + @table+'.txt -c -T -r{_r} -t {_t}' PRINT @sql EXEC master.dbo.xp_cmdshell @sql END TRY
BEGIN CATCH
--错误处理
SET @err = @@error
SELECT @__error_message = ERROR_MESSAGE()
INSERT dbo._SYNCERROR
( MSG )
VALUES ( @table + @__error_message )
END CATCH
END

如果你的master.dbo.xp_cmdshell 存储过程执行的有误,那么请在调用这个存储过程的前面加上:
 --启用xp_cmdshell 所属的高级配置项
 EXEC sp_configure 'show advanced options', 1 ;
 RECONFIGURE ;

--启用xp_cmdshell
 EXEC sp_configure 'xp_cmdshell', 1 ;
 RECONFIGURE ;

接着在控制流中,拖拽执行进程任务的控制流项,双击进入任务编辑器。

在处理中的Executable选择WinRAR(或其他压缩包)程序,WorkingDirectory选择WinRAR(或其他压缩包)程序文件夹

如我的是,Executable:C:\Program Files\WinRAR\WinRAR.exe,WorkingDirectory:C:\Program Files\WinRAR\,设置完成之后点击确定

再从左边的工具栏里面拖拽“脚本任务”控制流项,名字改为“完成并产生OK标记”(打包完成之后会生成两个压缩包,一个是存放数据的压缩包,另一个是_ok.zip压缩包,有这个带ok的压缩包文件,说明是成功的打包文件,否则就认为是错误的文件)

同样双击编辑,进入脚本任务编辑器。在ReadOnlyVariables中选择:Flag,OutputPath,TableName变量,点击“编辑脚本”

 public void Main()
{
DateTime dtime = DateTime.Now;
SqlConnection con = new SqlConnection("你的数据库链接地址");
string TableName = Dts.Variables["TableName"].Value.ToString();
string Flag = Dts.Variables["Flag"].Value.ToString();
List<string> sqlList = new List<string>(); string sql = "INSERT INTO dbo.ActionTemp (objID,objtable,objAction,createDate,[state],DataFileName) SELECT ";
sql += "objid ,objtable , objaction , GETDATE() , 1, ";
sql += "'" + Dts.Variables["Flag"].Value + "'";
sql += " FROM dbo.ActionLogTempStorage WHERE objTable = '" + TableName + "'";
sqlList.Add(sql); sqlList.Add("DELETE FROM dbo.ActionLogTempStorage WHERE objTable ='" + TableName + "';");
try
{
con.Open();
SqlCommand cmd = new SqlCommand();
cmd.Connection = con;
cmd.CommandType = CommandType.Text;
for (int i = ; i < sqlList.Count; i++)
{
cmd.CommandText = sqlList[i].ToString();
if (cmd.ExecuteNonQuery() >= )
{
continue;
}
else
{
throw new Exception(" back db error sql str:" + sql[i]);
}
}
string Dataflag = Flag.Replace(".zip", "_ok.zip");
System.IO.File.Create(Dataflag).Dispose();
Dts.TaskResult = (int)ScriptResults.Success; }
catch (Exception ex)
{
if (File.Exists(Flag))
{
File.Delete(Flag);
}
if (File.Exists(Flag.Replace(".zip", "_ok.zip")))
{
File.Delete(Flag.Replace(".zip", "_ok.zip"));
}
Dts.Events.FireError(, "Err", "LastStep Error:" + ex.Message, "", );
Dts.TaskResult = (int)ScriptResults.Failure;
}
finally { if (con.State == ConnectionState.Open) { con.Close(); } }
}

到这里SSIS的开发流程已经完成了。我们可以通过刚刚创建项目的路径找到.dtsx文件放到需要调用的服务器上面

下面开始通过作业调用SSIS:

在作业“属性”的“步骤”中“新建”一个步骤,

类型选择“Sql Server Integration Services包”,

用行身份选择“Sql Server代理服务帐号”,

包源选择“Sql Server”,

使用Windows身份验证或使用Sql Server 身份验证(输入用户名和密码)都可以,

包选择你把.dtsx放的地方

作业的其他地方根据服务器的要求自己设定即可。

作业创建好之后,运行一下作业,看看有没有在指定的路径下面生成压缩包文件和_ok压缩包文件。

以及ActionLog和ActionTemp中的数据是否有变化,就可以判断SSIS包有没有执行成功了!

上一篇:用Delphi直接获取bmp图片的像素


下一篇:[译]MVC网站教程(二):异常管理