在运行hive cli命令时,调用hadoop jar hive-cli-0.13.1.jar org.apache.hadoop.hive.cli.CliDriver xxxx 命令,而org.apache.hadoop.util.RunJar方法其实是封装了反射调用,最终是调用org.apache.hadoop.hive.cli.CliDriver类的main方法.
CliDriver类是hive的入口类。
首先CliDriver类会通过OptionsProcessor类来parse输入的命令。比如解析-e,-s,-h等参数,然后把对应的值存放到对应的CliSessionState类的属性中,最后应用于CliDriver类中。
比如在executeDriver方法中,根据CliSessionState的属性对命令进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
CliDriver cli = new CliDriver();
cli.setHiveVariables(oproc.getHiveVariables()); // 有变量相关的设置时
// use the specified database if specified
cli.processSelectDatabase(ss);
// Execute -i init files (always in silent mode)
cli.processInitFiles(ss); // 指定了-i和加载.hiverc文件
if (ss. execString != null ) { // 指定了 -e时
int cmdProcessStatus = cli.processLine(ss. execString);
return cmdProcessStatus;
}
try { // 指定了-f时
if (ss. fileName != null ) {
return cli.processFile(ss.fileName );
}
} catch (FileNotFoundException e) {
System. err.println( "Could not open input file for reading. (" + e.getMessage() + ")" );
return 3 ;
}
|
在CliDriver类方法的调用顺序主要有下面几种
1)add xxx/set/compile/reset等命令
1
|
main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd--对应processor类的run方法 |
2)sql命令
1
|
main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd---->Driver类run方法 |
3)shell命令
1
|
main-->run--->executeDriver---->processLine--->processCmd |
其中CliDriver类中最重要的方法是processCmd,其定义了不同的命令不同的执行方式:
具体实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
ss.setLastCommand(cmd);
// Flush the print stream, so it doesn't include output from the last command
ss.err.flush();
String cmd_trimmed = cmd.trim();
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0 ;
if (cmd_trimmed.toLowerCase().equals( "quit" ) || cmd_trimmed.toLowerCase().equals( "exit" )) { //如果是quit或者是exit,则直接退出jvm
ss.close();
System.exit( 0 );
} else if (tokens[ 0 ].equalsIgnoreCase( "source" )) { // 如果是source xxx的情况,则按文件处理(调用processFile方法)
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[ 0 ].length());
File sourceFile = new File(cmd_1);
if (! sourceFile.isFile()){
console.printError( "File: " + cmd_1 + " is not a file." );
ret = 1 ;
} else {
try {
this .processFile(cmd_1);
} catch (IOException e) {
console.printError( "Failed processing file " + cmd_1 + " " + e.getLocalizedMessage(),
stringifyException(e));
ret = 1 ;
}
}
} else if (cmd_trimmed.startsWith( "!" )) { // 以!开头的,做为shell命令执行,最终调用Runtime.getRuntime().exec(shell_cmd)
String shell_cmd = cmd_trimmed.substring( 1 );
shell_cmd = new VariableSubstitution().substitute(ss.getConf(), shell_cmd); //这里也会进行变量替换
// shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
try {
Process executor = Runtime. getRuntime().exec(shell_cmd);
StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null , ss.out);
StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null , ss.err);
outPrinter.start();
errPrinter.start();
ret = executor.waitFor();
if (ret != 0 ) {
console.printError( "Command failed with exit code = " + ret);
}
} catch (Exception e) {
console.printError( "Exception raised from Shell command " + e.getLocalizedMessage(),
stringifyException(e));
ret = 1 ;
}
} else if (tokens[ 0 ].toLowerCase().equals( "list" )) { // list命令时,调用SessionState的list_resource方法
SessionState.ResourceType t;
if (tokens. length < 2 || (t = SessionState.find_resource_type(tokens[ 1 ])) == null ) {
console.printError( "Usage: list ["
+ StringUtils.join(SessionState.ResourceType.values(), "|" ) + "] [<value> [<value>]*]" );
ret = 1 ;
} else {
List<String> filter = null ;
if (tokens.length >= 3 ) {
System. arraycopy(tokens, 2 , tokens, 0 , tokens.length - 2 );
filter = Arrays. asList(tokens);
}
Set<String> s = ss.list_resource(t, filter);
if (s != null && !s.isEmpty()) {
ss.out.println(StringUtils.join(s, "\n" ));
}
}
} else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server //如果是远程模式,即hiveserver,调用HiveClient类的execute方法
HiveClient client = ss.getClient();
PrintStream out = ss.out;
PrintStream err = ss.err;
try {
client.execute(cmd_trimmed);
List<String> results;
do {
results = client.fetchN( LINES_TO_FETCH);
for (String line : results) {
out.println(line);
}
} while (results.size() == LINES_TO_FETCH);
} catch (HiveServerException e) {
ret = e.getErrorCode();
if (ret != 0 ) { // OK if ret == 0 -- reached the EOF
String errMsg = e.getMessage();
if (errMsg == null ) {
errMsg = e.toString();
}
ret = e.getErrorCode();
err.println( "[Hive Error]: " + errMsg);
}
} catch (TException e) {
String errMsg = e.getMessage();
if (errMsg == null ) {
errMsg = e.toString();
}
ret = - 10002 ;
err.println( "[Thrift Error]: " + errMsg);
} finally {
try {
client.clean();
} catch (TException e) {
String errMsg = e.getMessage();
if (errMsg == null ) {
errMsg = e.toString();
}
err.println( "[Thrift Error]: Hive server is not cleaned due to thrift exception: "
+ errMsg);
}
}
} else { // local mode // 剩下的情况都作为local模式,比如add xxx,set xxxx,select/insert xxx/show tables/create table,databse/use xxx等命令。
try {
CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf); //会先根据命令获取对应的CommandProcessor 实现类
ret = processLocalCmd(cmd, proc, ss); //并调用processLocalCmd方法
} catch (SQLException e) {
console.printError( "Failed processing command " + tokens[ 0 ] + " " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1 ;
}
}
return ret;
}
|
而processLocalCmd方法会将CommandProcessor的实例作为参数传入,并根据不同的CommandProcessor实现类,来调用不同的类的run方法。
1
|
int processLocalCmd (String cmd, CommandProcessor proc, CliSessionState ss)
|
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1542275,如需转载请自行联系原作者