1.获取入口类
从hive以及ext/cli.sh脚本里面可以看到执行的主类为org.apache.hadoop.hive.cli.CliDriver
2.执行main方法
3.执行run方法
3.1 解析系统参数,比如hiveconf、hive.root.logger等
process_stage1方法如下:
public boolean process_stage1(String[] argv) {
try {
commandLine = new GnuParser().parse(options, argv);
Properties confProps = commandLine.getOptionProperties("hiveconf");
for (String propKey : confProps.stringPropertyNames()) {
// with HIVE-11304, hive.root.logger cannot have both logger name and log level.
// if we still see it, split logger and level separately for hive.root.logger
// and hive.log.level respectively
if (propKey.equalsIgnoreCase("hive.root.logger")) {
CommonCliOptions.splitAndSetLogger(propKey, confProps);
} else {
System.setProperty(propKey, confProps.getProperty(propKey));
}
}
Properties hiveVars = commandLine.getOptionProperties("define");
for (String propKey : hiveVars.stringPropertyNames()) {
hiveVariables.put(propKey, hiveVars.getProperty(propKey));
}
Properties hiveVars2 = commandLine.getOptionProperties("hivevar");
for (String propKey : hiveVars2.stringPropertyNames()) {
hiveVariables.put(propKey, hiveVars2.getProperty(propKey));
}
} catch (ParseException e) {
System.err.println(e.getMessage());
printUsage();
return false;
}
return true;
}
3.2 定义流
定义一些标准输入输出流用户HQL的输入以及打印信息
3.3 解析 -e -f 等用户输入的参数
process_stage2方法如下:
public boolean process_stage2(CliSessionState ss) {
ss.getConf();
if (commandLine.hasOption('H')) {
printUsage();
return false;
}
ss.setIsSilent(commandLine.hasOption('S'));
ss.database = commandLine.getOptionValue("database");
ss.execString = commandLine.getOptionValue('e');
ss.fileName = commandLine.getOptionValue('f');
ss.setIsVerbose(commandLine.hasOption('v'));
String[] initFiles = commandLine.getOptionValues('i');
if (null != initFiles) {
ss.initFiles = Arrays.asList(initFiles);
}
if (ss.execString != null && ss.fileName != null) {
System.err.println("The '-e' and '-f' options cannot be specified simultaneously");
printUsage();
return false;
}
if (commandLine.hasOption("hiveconf")) {
Properties confProps = commandLine.getOptionProperties("hiveconf");
for (String propKey : confProps.stringPropertyNames()) {
ss.cmdProperties.setProperty(propKey, confProps.getProperty(propKey));
}
}
return true;
}
3.4 执行cli driver work
4.executeDriver方法
代码如下:
private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc)
throws Exception {
CliDriver cli = new CliDriver();
cli.setHiveVariables(oproc.getHiveVariables());
// use the specified database if specified
// 使用声明的database
cli.processSelectDatabase(ss);
// Execute -i init files (always in silent mode)
cli.processInitFiles(ss);
if (ss.execString != null) {
int cmdProcessStatus = cli.processLine(ss.execString);
return cmdProcessStatus;
}
try {
if (ss.fileName != null) {
return cli.processFile(ss.fileName);
}
} catch (FileNotFoundException e) {
System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");
return 3;
}
if ("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE))) {
console.printInfo(HiveConf.generateMrDeprecationWarning());
}
setupConsoleReader();
String line;
int ret = 0;
String prefix = "";
String curDB = getFormattedDb(conf, ss);
String curPrompt = prompt + curDB;
String dbSpaces = spacesForString(curDB);
// 1.读取输入HQL
while ((line = reader.readLine(curPrompt + "> ")) != null) {
if (!prefix.equals("")) {
prefix += '\n';
}
if (line.trim().startsWith("--")) {
continue;
}
// 用;来切割
if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
line = prefix + line;
// 处理每行HQL
ret = cli.processLine(line, true);
prefix = "";
curDB = getFormattedDb(conf, ss);
curPrompt = prompt + curDB;
dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
} else {
prefix = prefix + line;
curPrompt = prompt2 + dbSpaces;
continue;
}
}
return ret;
}
调用processLine方法来处理每行HQL
5.processLine方法
在其中调用了processCmd方法
6.processCmd方法
代码如下:
主要判断:
- 是否quit或者exit命令
- 如果为source命令,执行文件
- 如果以!开头,执行shell命令
- 如果前三者都不是,执行正常解析操作
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
ss.setLastCommand(cmd);
ss.updateThreadName();
// Flush the print stream, so it doesn't include output from the last command
ss.err.flush();
String cmd_trimmed = HiveStringUtils.removeComments(cmd).trim();
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0;
// 1.如果命令为quit或者exit,则退出
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
// if we have come this far - either the previous commands
// are all successful or this is command line. in either case
// this counts as a successful run
ss.close();
System.exit(0);
// 2.如果命令为source,执行HQL文件
} else if (tokens[0].equalsIgnoreCase("source")) {
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
cmd_1 = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), cmd_1);
File sourceFile = new File(cmd_1);
if (! sourceFile.isFile()){
console.printError("File: "+ cmd_1 + " is not a file.");
ret = 1;
} else {
try {
ret = processFile(cmd_1);
} catch (IOException e) {
console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
}
// 3.命令以!开头,执行shell命令
} else if (cmd_trimmed.startsWith("!")) {
// for shell commands, use unstripped command
String shell_cmd = cmd.trim().substring(1);
shell_cmd = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), shell_cmd);
// shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
try {
ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmd, ss.out, ss.err);
ret = executor.execute();
if (ret != 0) {
console.printError("Command failed with exit code = " + ret);
}
} catch (Exception e) {
console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
// 4.如果前面三个都不满足,进行解析
} else { // local mode
try {
try (CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf)) {
if (proc instanceof IDriver) {
// Let Driver strip comments using sql parser
ret = processLocalCmd(cmd, proc, ss);
} else {
ret = processLocalCmd(cmd_trimmed, proc, ss);
}
}
} catch (SQLException e) {
console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
ss.resetThreadName();
return ret;
}
7.processLocalCmd方法
调用IDriver的run方法
8.qp.run方法
该方法是IDriver接口的抽象方法,实现类是org.apache.hadoop.hive.ql.Driver
9.runInternal方法
其中主要分为两步:
- 编译HQL语句
- 执行
9.1 compileInternal方法
调用compile方法
9.1.1 compile方法
9.1.1.1 调用ParseUtils.parse方法生成ASTNode
9.1.1.2 ParseUtils.parse方法
在ParseDriver中最终分为四步:
- 构建词法解析器
- 替换HQL中的关键词
- 语法解析
- 获取最终的ASTNode
public ASTNode parse(String command, Context ctx, String viewFullyQualifiedName)
throws ParseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Parsing command: " + command);
}
// 1.构建词法解析器
HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));
// 2.替换HQL中的关键词
TokenRewriteStream tokens = new TokenRewriteStream(lexer);
if (ctx != null) {
if (viewFullyQualifiedName == null) {
// Top level query
ctx.setTokenRewriteStream(tokens);
} else {
// It is a view
ctx.addViewTokenRewriteStream(viewFullyQualifiedName, tokens);
}
lexer.setHiveConf(ctx.getConf());
}
HiveParser parser = new HiveParser(tokens);
if (ctx != null) {
parser.setHiveConf(ctx.getConf());
}
parser.setTreeAdaptor(adaptor);
HiveParser.statement_return r = null;
try {
// 3.语法解析
r = parser.statement();
} catch (RecognitionException e) {
e.printStackTrace();
throw new ParseException(parser.errors);
}
if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {
LOG.debug("Parse Completed");
} else if (lexer.getErrors().size() != 0) {
throw new ParseException(lexer.getErrors());
} else {
throw new ParseException(parser.errors);
}
// 4.获取最终的ASTNode
ASTNode tree = (ASTNode) r.getTree();
tree.setUnknownTokenBoundaries();
return tree;
}
9.1.2 sem.analyze方法
在compile方法里面调用analyze方法解析AST
实现类:org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
9.1.2.1 analyzeInternal方法
void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticException {
LOG.info("Starting Semantic Analysis");
// 1. Generate Resolved Parse tree from syntax tree
boolean needsTransform = needsTransform();
//change the location of position alias process here
processPositionAlias(ast);
PlannerContext plannerCtx = pcf.create();
// 将AST转换为QueryBlock
if (!genResolvedParseTree(ast, plannerCtx)) {
return;
}
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY)) {
for (String alias : qb.getSubqAliases()) {
removeOBInSubQuery(qb.getSubqForAlias(alias));
}
}
// Check query results cache.
// If no masking/filtering required, then we can check the cache now, before
// generating the operator tree and going through CBO.
// Otherwise we have to wait until after the masking/filtering step.
boolean isCacheEnabled = isResultsCacheEnabled();
QueryResultsCache.LookupInfo lookupInfo = null;
if (isCacheEnabled && !needsTransform && queryTypeCanUseCache()) {
lookupInfo = createLookupInfoForQuery(ast);
if (checkResultsCache(lookupInfo)) {
return;
}
}
ASTNode astForMasking;
if (isCBOExecuted() && needsTransform &&
(qb.isCTAS() || qb.isView() || qb.isMaterializedView() || qb.isMultiDestQuery())) {
// If we use CBO and we may apply masking/filtering policies, we create a copy of the ast.
// The reason is that the generation of the operator tree may modify the initial ast,
// but if we need to parse for a second time, we would like to parse the unmodified ast.
astForMasking = (ASTNode) ParseDriver.adaptor.dupTree(ast);
} else {
astForMasking = ast;
}
// 2. Gen OP Tree from resolved Parse Tree
Operator sinkOp = genOPTree(ast, plannerCtx);
boolean usesMasking = false;
if (!unparseTranslator.isEnabled() &&
(tableMask.isEnabled() && analyzeRewrite == null)) {
// Here we rewrite the * and also the masking table
ASTNode rewrittenAST = rewriteASTWithMaskAndFilter(tableMask, astForMasking, ctx.getTokenRewriteStream(),
ctx, db, tabNameToTabObject, ignoredTokens);
if (astForMasking != rewrittenAST) {
usesMasking = true;
plannerCtx = pcf.create();
ctx.setSkipTableMasking(true);
init(true);
//change the location of position alias process here
processPositionAlias(rewrittenAST);
genResolvedParseTree(rewrittenAST, plannerCtx);
if (this instanceof CalcitePlanner) {
((CalcitePlanner) this).resetCalciteConfiguration();
}
sinkOp = genOPTree(rewrittenAST, plannerCtx);
}
}
// Check query results cache
// In the case that row or column masking/filtering was required, we do not support caching.
// TODO: Enable caching for queries with masking/filtering
if (isCacheEnabled && needsTransform && !usesMasking && queryTypeCanUseCache()) {
lookupInfo = createLookupInfoForQuery(ast);
if (checkResultsCache(lookupInfo)) {
return;
}
}
// 3. Deduce Resultset Schema
// 定义生成的Schema
if (createVwDesc != null && !this.ctx.isCboSucceeded()) {
resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());
} else {
// resultSchema will be null if
// (1) cbo is disabled;
// (2) or cbo is enabled with AST return path (whether succeeded or not,
// resultSchema will be re-initialized)
// It will only be not null if cbo is enabled with new return path and it
// succeeds.
if (resultSchema == null) {
resultSchema = convertRowSchemaToResultSetSchema(opParseCtx.get(sinkOp).getRowResolver(),
HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES));
}
}
// 4. Generate Parse Context for Optimizer & Physical compiler
copyInfoToQueryProperties(queryProperties);
ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps,
new HashSet<JoinOperator>(joinContext.keySet()),
new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner,
globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc,
queryProperties, viewProjectToTableSchema, acidFileSinks);
// Set the semijoin hints in parse context
pCtx.setSemiJoinHints(parseSemiJoinHint(getQB().getParseInfo().getHintList()));
// Set the mapjoin hint if it needs to be disabled.
pCtx.setDisableMapJoin(disableMapJoinWithHint(getQB().getParseInfo().getHintList()));
// 5. Take care of view creation
if (createVwDesc != null) {
if (ctx.getExplainAnalyze() == AnalyzeState.RUNNING) {
return;
}
if (!ctx.isCboSucceeded()) {
saveViewDefinition();
}
// validate the create view statement at this point, the createVwDesc gets
// all the information for semanticcheck
validateCreateView();
if (createVwDesc.isMaterialized()) {
createVwDesc.setTablesUsed(getTablesUsed(pCtx));
} else {
// Since we're only creating a view (not executing it), we don't need to
// optimize or translate the plan (and in fact, those procedures can
// interfere with the view creation). So skip the rest of this method.
ctx.setResDir(null);
ctx.setResFile(null);
try {
PlanUtils.addInputsForView(pCtx);
} catch (HiveException e) {
throw new SemanticException(e);
}
// Generate lineage info for create view statements
// if LineageLogger hook is configured.
// Add the transformation that computes the lineage information.
Set<String> postExecHooks = Sets.newHashSet(Splitter.on(",").trimResults()
.omitEmptyStrings()
.split(Strings.nullToEmpty(HiveConf.getVar(conf, HiveConf.ConfVars.POSTEXECHOOKS))));
if (postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")
|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")
|| postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {
ArrayList<Transform> transformations = new ArrayList<Transform>();
transformations.add(new HiveOpConverterPostProc());
transformations.add(new Generator(postExecHooks));
for (Transform t : transformations) {
pCtx = t.transform(pCtx);
}
// we just use view name as location.
queryState.getLineageState()
.mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp);
}
return;
}
}
// 6. Generate table access stats if required
if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) {
TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx);
setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess());
}
// 7. Perform Logical optimization
// 执行逻辑优化
if (LOG.isDebugEnabled()) {
LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
}
Optimizer optm = new Optimizer();
optm.setPctx(pCtx);
optm.initialize(conf);
// 执行优化
pCtx = optm.optimize();
if (pCtx.getColumnAccessInfo() != null) {
// set ColumnAccessInfo for view column authorization
setColumnAccessInfo(pCtx.getColumnAccessInfo());
}
if (LOG.isDebugEnabled()) {
LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
}
// 8. Generate column access stats if required - wait until column pruning
// takes place during optimization
boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2()
&& HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED);
if (isColumnInfoNeedForAuth
|| HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {
ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx);
// view column access info is carried by this.getColumnAccessInfo().
setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess(this.getColumnAccessInfo()));
}
// 9. Optimize Physical op tree & Translate to target execution engine (MR,
// TEZ..)
// 执行物理优化
if (!ctx.getExplainLogical()) {
TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);
compiler.init(queryState, console, db);
compiler.compile(pCtx, rootTasks, inputs, outputs);
fetchTask = pCtx.getFetchTask();
}
//find all Acid FileSinkOperatorS
QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks, ctx.getExecutionId());
// 10. Attach CTAS/Insert-Commit-hooks for Storage Handlers
final Optional<TezTask> optionalTezTask =
rootTasks.stream().filter(task -> task instanceof TezTask).map(task -> (TezTask) task)
.findFirst();
if (optionalTezTask.isPresent()) {
final TezTask tezTask = optionalTezTask.get();
rootTasks.stream()
.filter(task -> task.getWork() instanceof DDLWork)
.map(task -> (DDLWork) task.getWork())
.filter(ddlWork -> ddlWork.getPreInsertTableDesc() != null)
.map(ddlWork -> ddlWork.getPreInsertTableDesc())
.map(ddlPreInsertTask -> new InsertCommitHookDesc(ddlPreInsertTask.getTable(),
ddlPreInsertTask.isOverwrite()))
.forEach(insertCommitHookDesc -> tezTask.addDependentTask(
TaskFactory.get(new DDLWork(getInputs(), getOutputs(), insertCommitHookDesc), conf)));
}
LOG.info("Completed plan generation");
// 11. put accessed columns to readEntity
if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {
putAccessedColumnsToReadEntity(inputs, columnAccessInfo);
}
if (isCacheEnabled && lookupInfo != null) {
if (queryCanBeCached()) {
QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo);
// Specify that the results of this query can be cached.
setCacheUsage(new CacheUsage(
CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo));
}
}
}
9.2 execute方法
- 构建MRJob
- 启动任务
9.2.1 launchTask方法
9.2.2 runSequential方法
9.2.3 executeTask方法
9.2.4 execute方法
具体实现类为MapRedTask类
1.设置MR任务的相关执行类
2.构建执行MR任务的命令
3.执行ExecDriver