使用Sqoop API生成 Hive DDL的一种方法

一、建表

ddl:

create table testtb (id int,content string, country string) row format delimited fields terminated by '|' lines terminated by '\n' stored as textfile;

二、导入数据

1.shell中执行sqoop实现导入

sqoop import --connect jdbc:mysql://localhost:3306/test --username root --password hehe --table testtb --fields-terminated-by '|' --lines-terminated-by '\n' --hive-delims-replacement " " -m 1 --target-dir /user/hive/warehouse/test.db/testtb/ --append

(单机版,所以mysql设置的localhost)

2.sqoop api实现导入


import com.cloudera.sqoop.SqoopOptions;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.hive.TableDefWriter;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.tool.SqoopTool;
import org.apache.sqoop.util.OptionsFileUtil;

public class Test{
    public static void import2HDFS() throws Exception {
        String[] args = new String[]{
                "--connect", "jdbc:mysql://localhost:3306/test",
                "--driver", "com.mysql.jdbc.Driver",
                "--username", "root",
                "--password", "hehe",
                "--table", "testtb",
                "--fields-terminated-by", "|",
                "--lines-terminated-by", "\n",
                "--hive-delims-replacement", " ",
                "-m", "1",
                "--target-dir", "/user/hive/warehouse/test.db/testtb/",
                "--append"
        };
        String[] expandArguments = OptionsFileUtil.expandArguments(args);
        SqoopTool tool = SqoopTool.getTool("import");
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://192.168.0.110:9000");
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
        System.out.println(Sqoop.runSqoop(sqoop, expandArguments));
    }

}

主要跟踪调试一下其中的三行代码:

  • SqoopTool tool = SqoopTool.getTool("import");

    获取在SqoopTool的static块中注册的一些Tool的实例,这里获取到的是org.apache.sqoop.tool.ImportTool的实例

  • Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);

    Sqoop实现了org.apache.hadoop.util.Tool接口,构造方法中设置了其成员变量SqoopTool tool为上面的ImportTool类的对象

  • Sqoop.runSqoop(sqoop, expandArguments);

    (1)调用了org.apache.hadoop.util.ToolRunner的run方法,进行一些参数的解析;

    public static int run(Configuration conf, Tool tool, String[] args) throws Exception{
        if(conf == null) {
            conf = new Configuration();
        }
        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
        //set the configuration back, so that Tool can configure itself
        tool.setConf(conf);
        
        //get the args w/o generic hadoop args
        String[] toolArgs = parser.getRemainingArgs();
        return tool.run(toolArgs);
    }
    

    这里的tool是Sqoop类的实例

    (2)Sqoop的run方法:

    public int run(String[] args) {
        if (this.options.getConf() == null) {
            this.options.setConf(this.getConf());
        }
    
        try {
            this.options = this.tool.parseArguments(args, (Configuration)null, this.options, false);
            this.tool.appendArgs(this.childPrgmArgs);
            this.tool.validateOptions(this.options);
        } catch (Exception var3) {
            LOG.debug(var3.getMessage(), var3);
            System.err.println(var3.getMessage());
            return 1;
        }
    
        return this.tool.run(this.options);
    }
    

    这里的tool是Sqoop类的成员变量com.cloudera.sqoop.tool.SqoopTool类的对象,本例子中实际为org.apache.sqoop.tool.ImportTool类的对象。
    (com.cloudera.sqoop.tool.SqoopTool的父类为org.apache.sqoop.tool.SqoopTool;
    ImportTool的父类为com.cloudera.sqoop.tool.BaseSqoopTool,这个BaseSqoopTool的父类为org.apache.sqoop.tool.BaseSqoopTool,这个BaseSqoopTool的父类为com.cloudera.sqoop.tool.SqoopTool。)
    这里的this.tool.parseArguments调用的是org.apache.sqoop.tool.SqoopTool类中的方法,解析参数,得到SqoopOptions,作为参数调用ImportTool的run方法

    (3)ImportTool的run方法,

    public int run(SqoopOptions options) {
        HiveImport hiveImport = null;
        if (this.allTables) {
            LOG.error("ImportTool.run() can only handle a single table.");
            return 1;
        } else if (!this.init(options)) {
            return 1;
        } else {
            this.codeGenerator.setManager(this.manager);
    
            byte var4;
            try {
                if (options.doHiveImport()) {
                    hiveImport = new HiveImport(options, this.manager, options.getConf(), false);
                }
    
                this.importTable(options, options.getTableName(), hiveImport);
                return 0;
            } catch (...) {
                ...
            }
            ...
        }
    }
    

    由代码可知,如果设置了导入hive的选项,会生成一个HiveImport类的对象传入importTable方法中,在HiveImport的importTable方法中,在判断options.doHiveImport()的if语句处,调用了hiveImport.importTable方法,在该方法中看到可用的获取ddl的目标代码:

    TableDefWriter tableWriter = new TableDefWriter(this.options, this.connManager, inputTableName, outputTableName, this.configuration, !debugMode);
        String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
    

    只要能找到this.options, this.connManager, this.configuration这三个参数就可以利用TableDefWriter类生成DDL语句。

    • options : 主要是在Sqoop的run方法中,调用了org.apache.sqoop.tool.SqoopTool的parseArguments方法即可生成
    • connManager : 在ImportTool的run方法中生成HiveImport的实例时传入HiveImport的构造方法,这个成员变量是ImportTool的成员变量manager,从其父类com.cloudera.sqoop.tool.BaseSqoopTool的父类org.apache.sqoop.tool.BaseSqoopTool继承而来,在org.apache.sqoop.tool.BaseSqoopTool定义了protected ConnManager manager和getManager()方法。ImportTool的run方法中,调用了ImportTool的init方法,其中又调用了org.apache.sqoop.tool.BaseSqoopTool的init方法,该方法中生成manager实例的代码为JobData data = new JobData(sqoopOpts, this);this.manager = (new ConnFactory(sqoopOpts.getConf())).getManager(data);,所以获取到ImportTool后调用init方法即可实例化manager成员变量
    • configuration : org.apache.hadoop.conf.Configuration类的对象,设置namenode即可

三、使用Sqoop API生成 Hive DDL的一种方法

    public String test1() throws Exception {
        String[] args = new String[]{
                "--connect", "jdbc:mysql://localhost:3306/test",
                "--driver", "com.mysql.jdbc.Driver",
                "--username", "root",
                "--password", "hehe",
                "--table", "testtb",
                "--fields-terminated-by", "|",
                "--lines-terminated-by", "\n",
                "--hive-delims-replacement", " ",
                "-m", "1",
                "--target-dir", "/user/hive/warehouse/test.db/testtb/",
                "--append"
        };
        String[] expandArguments = OptionsFileUtil.expandArguments(args);
        SqoopTool tool = SqoopTool.getTool("import");
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://192.168.0.110:9000");
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
        // 通过反射得到org.apache.sqoop.tool.BaseSqoopTool的getManager方法
        Method getManager = sqoop.getTool().getClass().getSuperclass().getSuperclass().getMethod("getManager");
        // 通过反射获取ImportTool的init方法
        Method importToolInit = sqoop.getTool().getClass().getDeclaredMethod("init",
                new Class[]{com.cloudera.sqoop.SqoopOptions.class});
        // 该方法的访问权限为protected,设置可访问
        importToolInit.setAccessible(true);
        // 获取options
        SqoopOptions options = sqoop.getTool().parseArguments(args, conf, sqoop.getOptions(), false);
        // 调用ImportTool的init方法,实例化其manager成员变量
        if (!(boolean) importToolInit.invoke(sqoop.getTool(), new Object[]{options})) {
            System.out.println("初始化失败");
            System.exit(1);
        }
        // 调用org.apache.sqoop.tool.BaseSqoopTool的getManager方法,获取manager
        ConnManager manager = (ConnManager) getManager.invoke(sqoop.getTool());
        // 获取建表语句
        TableDefWriter tableWriter = new TableDefWriter(sqoop.getOptions()
                , (com.cloudera.sqoop.manager.ConnManager) manager
                , "testtb", "testtb", conf, false);
        return tableWriter.getCreateTableStmt();
    }

日志及输出:

19:41:52.778 [main] DEBUG org.apache.sqoop.SqoopOptions - Generated nonce dir: /tmp/sqoop-root/compile/dccf8b05825988dc41eda7a9ac8e040e
19:41:52.810 [main] INFO org.apache.sqoop.Sqoop - Running Sqoop version: 1.4.7
19:41:52.825 [main] WARN org.apache.sqoop.tool.BaseSqoopTool - Setting your password on the command-line is insecure. Consider using -P instead.
19:41:52.911 [main] DEBUG org.apache.hadoop.util.Shell - setsid exited with exit code 0
19:41:52.938 [main] DEBUG org.apache.sqoop.ConnFactory - Loaded manager factory: org.apache.sqoop.manager.oracle.OraOopManagerFactory
19:41:52.947 [main] DEBUG org.apache.sqoop.ConnFactory - Loaded manager factory: com.cloudera.sqoop.manager.DefaultManagerFactory
19:41:52.948 [main] DEBUG org.apache.sqoop.ConnFactory - Trying ManagerFactory: org.apache.sqoop.manager.oracle.OraOopManagerFactory
19:41:52.993 [main] DEBUG org.apache.sqoop.manager.oracle.OraOopUtilities - Enabled OraOop debug logging.
19:41:52.994 [main] DEBUG org.apache.sqoop.manager.oracle.OraOopManagerFactory - Data Connector for Oracle and Hadoop can be called by Sqoop!
19:41:52.994 [main] DEBUG org.apache.sqoop.ConnFactory - Trying ManagerFactory: com.cloudera.sqoop.manager.DefaultManagerFactory
19:41:52.995 [main] DEBUG org.apache.sqoop.manager.DefaultManagerFactory - Trying with scheme: jdbc:mysql:
19:41:53.008 [main] INFO org.apache.sqoop.manager.MySQLManager - Preparing to use a MySQL streaming resultset.
19:41:53.019 [main] DEBUG org.apache.sqoop.ConnFactory - Instantiated ConnManager org.apache.sqoop.manager.MySQLManager@6c9f5c0d
19:41:53.023 [main] DEBUG org.apache.sqoop.manager.SqlManager - Execute getColumnInfoRawQuery : SELECT t.* FROM `testtb` AS t LIMIT 1
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
19:41:53.029 [main] DEBUG org.apache.sqoop.manager.SqlManager - No connection paramenters specified. Using regular API for making connection.
19:41:53.400 [main] DEBUG org.apache.sqoop.manager.SqlManager - Using fetchSize for next query: -2147483648
19:41:53.400 [main] INFO org.apache.sqoop.manager.SqlManager - Executing SQL statement: SELECT t.* FROM `testtb` AS t LIMIT 1
19:41:53.422 [main] DEBUG org.apache.sqoop.manager.SqlManager - Found column id of type [4, 10, 0]
19:41:53.423 [main] DEBUG org.apache.sqoop.manager.SqlManager - Found column content of type [-1, 21845, 0]
19:41:53.423 [main] DEBUG org.apache.sqoop.manager.SqlManager - Found column country of type [12, 255, 0]
19:41:53.426 [main] DEBUG org.apache.sqoop.manager.SqlManager - Using fetchSize for next query: -2147483648
19:41:53.426 [main] INFO org.apache.sqoop.manager.SqlManager - Executing SQL statement: SELECT t.* FROM `testtb` AS t LIMIT 1
19:41:53.427 [main] DEBUG org.apache.sqoop.manager.SqlManager - Found column id of type INT
19:41:53.427 [main] DEBUG org.apache.sqoop.manager.SqlManager - Found column content of type TEXT
19:41:53.427 [main] DEBUG org.apache.sqoop.manager.SqlManager - Found column country of type VARCHAR
19:41:53.429 [main] DEBUG org.apache.sqoop.hive.TableDefWriter - Create statement: CREATE TABLE IF NOT EXISTS `testtb` ( `id` INT, `content` STRING, `country` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\174' LINES TERMINATED BY '\012' STORED AS TEXTFILE



CREATE TABLE IF NOT EXISTS `testtb` ( `id` INT, `content` STRING, `country` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\174' LINES TERMINATED BY '\012' STORED AS TEXTFILE

最后一行即为获取到的建表语句,这样通过sqoop的api就可以生成建表语句,再做其他处理,如改为创建外部表,改变存储格式等,不用再写很多代码来实现。

上一篇:hive sqoop


下一篇:django-orm-standalone