Kettle实现数据库迁移
需求:
做数据仓库时,需要将业务系统CRM抽取到数据仓库的缓冲层,业务系统使用的是SqlServer数据库,数据仓库的缓冲层使用的是mysql数据库,为实现数据库的迁移,即将SqlServer数据库中的所有表与数据迁移到Mysql。
解决办法:
kettle设计一整套流程实现,读取数据库中表->创建表->表数据抽取
实现过程:
整套流程分为:2个job,4个trans。使用到的Trans插件:表输入、字段选择、复制记录到结果、从结果获取记录、设置变量、java脚本、表输出。
1、表数据抽取作业:
作用:首先获取数据库中所有的表名称 然后调用子Job进行表的创建、数据抽取
2.表名称获取流程
要迁移的源库表名称获取,并设置到结果集,为下面的job使用。其中的表输入使用的是show tables,复制数据库中所有的表,也可以从表中或者excel中输入,实现更加小粒度的控制。
show tables 结果为Tables_in_数据库名称,和具体数据库有关,故需要改名
3、子作业: 实现单个表格的创建及抽取
4、表名称变量设置
上一步的子转换
5、入库表结构创建
执行的Java代码如下
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
// First, get a row from the default input hop Object[] r = getRow(); org.pentaho.di.core.database.DatabaseMeta dbmeta = null;
System.out.println(123);
System.out.println( getTrans().getRepository());
System.out.println(456);
java.util.List list = getTrans().getRepository().readDatabases(); if(list != null && !list.isEmpty())
{
for(int i=0;i<list.size();i++)
{
dbmeta = (org.pentaho.di.core.database.DatabaseMeta)list.get(i);
//test1 为数据库名称
if("test1".equalsIgnoreCase(dbmeta.getName()))
{
break;
}
}
} if(dbmeta!=null)
{
org.pentaho.di.core.database.Database db=new org.pentaho.di.core.database.Database(dbmeta); try
{
db.connect(); String tablename = getVariable("TABLENAME"); logBasic("开始创建表:" + tablename); if(tablename!=null && tablename.trim().length()>0)
{
String sql = db.getDDLCreationTable(tablename, data.inputRowMeta);//${TABLENAME} db.execStatement(sql.replace(";", "")); logBasic(sql);
}
}
catch(Exception e)
{
logError("创建表出现异常",e); }finally{
db.disconnect();
}
}
return false;
}
6、表数据抽取流程
引用原文:
1、源表若存在有blob的表,会有问题,可能是由于表输出没有指定字段的原因
2、以上的操作使用的是仓库,kettle repo会报错
3、将原文中String sql = db.getDDL(tablename, data.inputRowMeta);函数名 getDDL 改为 getDDLCreationTable
3、将原文中String sql = db.getDDL(tablename, data.inputRowMeta);函数名 getDDL 改为 getDDLCreationTable
4、去除了原文中创建表之前表输入一个操作,原文当有空表需要复制时候,会报错
参考:
原文地址: 用Kettle的一套流程完成对整个数据库迁移
data-integration\samples\jobs\process all tables 实现整个数据库的迁移,
代码下载
http://pan.baidu.com/s/1nt7LOj3