标签:
转自:http://blog.csdn.net/gexiaobaohelloworld/article/details/7719163
入口:
bin/hive脚本中,环境检查后执行ext中的cli.sh,进入主类:CliDriver.main。
CliDriver.main:
进入cli.processLine,处理分号";"分割为一条一条语句,再进processCmd。
processCmd:
处理quit/exit,再处理source,处理!,处理list;else建立CommandProcessor(实现有Driver和各种Processor),set/dfs/add/delete命令有单独的Processor,剩下的走Driver。
如果是Driver类型的Processor:
把cmd发到这个driver的run,再进到compile,在compile中,用一个parseDriver去生成ASTNode(生成使用了antlr,主要过程:经过文法分析器切割,进解析器,出来一个TREE),这里有细节的compile的过程说明 http://fromheartgo.wordpress.com/2010/04/02/hive%E7%9A%84compile%E8%BF%87%E7%A8%8B%EF%BC%881%EF%BC%89/ ;
根据得到的ASTNode,开始语义分析,把结果设置到一个QueryPlan对象中,初始化一些task放在QueryPlan中;
run里的test only代码读了test.serialize.qplan的设置,test状态会把这些查询记录写到文件里;权限检查。
退出complie,在Driver的run中分解执行MR后,退出来到了processCmd:
如果装填一切正常,通过getResults取到MR运行结果。
全过程如下:
CliDriver.main > processLine > processCmd >> Driver.run(cmd) > compile >> BaseSemanticAnalyzer >> xxxSemanticAnalyzer(常规select走SemanticAnalyzer) > analyze(sem.analyze) >> SemanticAnalyzer的analyzeInternal方法 >> new Optimizer.optimize(进行列剪裁等优化后生成Task) > genMapRedTasks >> 返回到Driver.run(cmd) >>ret = execute() >> launchTask >> TaskRunner.run > Task.executeTask > ExecDriver.execute > 执行MR(submitJob) >> getResults.
http://blog.csdn.net/bupt041137/article/details/6553762
即:
HiveCLI [Java Application]
org.apache.hadoop.hive.cli.CliDriver at localhost:38723
Thread [main] (Suspended)
Driver.execute() line: 1344
Driver.runExecute() line: 1219
Driver.run(String, Map<String,Object[]>) line: 1177
Driver.run(String) line: 1159
CliDriver.processLocalCmd(String, CommandProcessor, CliSessionState) line: 258
CliDriver.processCmd(String) line: 215
CliDriver.processLine(String, boolean) line: 411
CliDriver.run(String[]) line: 679
CliDriver.main(String[]) line: 562
/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/bin/java (2012-7-6 上午11:36:07)
主函数是CliDriver类的main函数,然后走run函数,再做了一些初始化和检测后,再调用processLine,再调用processCmd。processLocalCmd则调用了Driver类的run函数和runExcute函数。
直到:
while ((line = reader.readLine(curPrompt + "> ")) != null) {
表示重复请求读入 SQL>
1,cli/src/java CliDriver.main是主函数。
- public static void main(String[] args) throws Exception {
- int ret = run(args);
- System.exit(ret);
- }
2,进入run函数
- public static int run(String[] args) throws Exception {
-
- OptionsProcessor oproc = new OptionsProcessor();
- if (!oproc.process_stage1(args)) {
- return 1;
- }
- boolean logInitFailed = false;
- String logInitDetailMessage;
- try {
- logInitDetailMessage = LogUtils.initHiveLog4j();
- } catch (LogInitializationException e) {
- logInitFailed = true;
- logInitDetailMessage = e.getMessage();
- }
- CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
- ss.in = System.in;
- try {
- ss.out = new PrintStream(System.out, true, "UTF-8");
- ss.err = new PrintStream(System.err, true, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- return 3;
- }
- if (!oproc.process_stage2(ss)) {
- return 2;
- }
- if (!ss.getIsSilent()) {
- if (logInitFailed) {
- System.err.println(logInitDetailMessage);
- } else {
- SessionState.getConsole().printInfo(logInitDetailMessage);
- }
- }
- HiveConf conf = ss.getConf();
- for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) {
- conf.set((String) item.getKey(), (String) item.getValue());
- }
- SessionState.start(ss);
-
- if (ss.getHost() != null) {
- ss.connect();
- if (ss.isRemoteMode()) {
- prompt = "[" + ss.host + ‘:‘ + ss.port + "] " + prompt;
- char[] spaces = new char[prompt.length()];
- Arrays.fill(spaces, ‘ ‘);
- prompt2 = new String(spaces);
- }
- }
- if (!ss.isRemoteMode() && !ShimLoader.getHadoopShims().usesJobShell()) {
-
-
-
- ClassLoader loader = conf.getClassLoader();
-
- String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
- if (StringUtils.isNotBlank(auxJars)) {
- loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","));
- }
- conf.setClassLoader(loader);
- Thread.currentThread().setContextClassLoader(loader);
- }
- CliDriver cli = new CliDrive();
- cli.setHiveVariables(oproc.getHiveVariables());
- cli.processInitFiles(ss);
- if (ss.execString != null) {
- return cli.processLine(ss.execString);
- }
-
- try {
- if (ss.fileName != null) {
- return cli.processFile(ss.fileName);
- }
- } catch (FileNotFoundException e) {
- System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");
- return 3;
- }
- ConsoleReader reader = new ConsoleReader();
- reader.setBellEnabled(false);
- reader.addCompletor(getCommandCompletor());
-
- String line;
- final String HISTORYFILE = ".hivehistory";
- String historyFile = System.getProperty("user.home") + File.separator + HISTORYFILE;
- reader.setHistory(new History(new File(historyFile)));
- int ret = 0;
-
- String prefix = "";
- String curDB = getFormattedDb(conf, ss);
- String curPrompt = prompt + curDB;
- String dbSpaces = spacesForString(curDB);
-
- while ((line = reader.readLine(curPrompt + "> ")) != null) {
- if (!prefix.equals("")) {
- prefix += ‘\n‘;
- }
- if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
- line = prefix + line;
- ret = cli.processLine(line, true);
- prefix = "";
- curDB = getFormattedDb(conf, ss);
- curPrompt = prompt + curDB;
- dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
- } else {
- prefix = prefix + line;
- curPrompt = prompt2 + dbSpaces;
- continue;
- }
- }
-
- ss.close();
-
- return ret;
3,主要是调用了 processLine。
ProcessLine又调用了 processCmd。
CliDriver.processLine 去掉命令末尾的;,
- public int processLine(String line, boolean allowInterupting) {
- SignalHandler oldSignal = null;
- Signal interupSignal = null;
-
- if (allowInterupting) {
-
-
- interupSignal = new Signal("INT");
- oldSignal = Signal.handle(interupSignal, new SignalHandler() {
- private final Thread cliThread = Thread.currentThread();
- private boolean interruptRequested;
-
- @Override
- public void handle(Signal signal) {
- boolean initialRequest = !interruptRequested;
- interruptRequested = true;
-
-
- if (!initialRequest) {
- console.printInfo("Exiting the JVM");
- System.exit(127);
- }
-
-
-
- console.printInfo("Interrupting... Be patient, this might take some time.");
- console.printInfo("Press Ctrl+C again to kill JVM");
-
-
- HadoopJobExecHelper.killRunningJobs();
- HiveInterruptUtils.interrupt();
- this.cliThread.interrupt();
- }
- });
- }
-
- try {
- int lastRet = 0, ret = 0;
-
- String command = "";
-
- for (String oneCmd : line.split(";")) {
-
- if (StringUtils.endsWith(oneCmd, "\\")) {
- command += StringUtils.chop(oneCmd) + ";";
- continue;
- } else {
- command += oneCmd;
- }
- if (StringUtils.isBlank(command)) {
- continue;
- }
-
- ret = processCmd(command);
-
- SessionState ss = SessionState.get();
- ss.setCommandType(null);
- command = "";
- lastRet = ret;
- boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
- if (ret != 0 && !ignoreErrors) {
- CommandProcessorFactory.clean((HiveConf) conf);
- return ret;
- }
- }
- CommandProcessorFactory.clean((HiveConf) conf);
- return lastRet;
- } finally {
-
- if (oldSignal != null && interupSignal != null) {
- Signal.handle(interupSignal, oldSignal);
- }
- }
- }
4,processCmd
CliDriver.processCmd
Split命令,分析第一个单词:
(1)如果是quit或者exit,退出。
(2)source,执行文件中的HiveQL
(3)!,执行命令,如!ls,列出当前目录的文件信息。
(4)list,列出jar/file/archive。
(5)如果是其他,则生成调用相应的CommandProcessor处理。
- public int processCmd(String cmd) {
- CliSessionState ss = (CliSessionState) SessionState.get();
- String cmd_trimmed = cmd.trim();
- String[] tokens = tokenizeCmd(cmd_trimmed);
- int ret = 0;
-
- if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
-
-
-
-
- ss.close();
- System.exit(0);
-
- } else if (tokens[0].equalsIgnoreCase("source")) {
- String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
-
- File sourceFile = new File(cmd_1);
- if (! sourceFile.isFile()){
- console.printError("File: "+ cmd_1 + " is not a file.");
- ret = 1;
- } else {
- try {
- this.processFile(cmd_1);
- } catch (IOException e) {
- console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
- org.apache.hadoop.util.StringUtils.stringifyException(e));
- ret = 1;
- }
- }
- } else if (cmd_trimmed.startsWith("!")) {
-
- String shell_cmd = cmd_trimmed.substring(1);
- shell_cmd = new VariableSubstitution().substitute(ss.getConf(), shell_cmd);
-
-
- try {
- Process executor = Runtime.getRuntime().exec(shell_cmd);
- StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, ss.out);
- StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, ss.err);
-
- outPrinter.start();
- errPrinter.start();
-
- ret = executor.waitFor();
- if (ret != 0) {
- console.printError("Command failed with exit code = " + ret);
- }
- } catch (Exception e) {
- console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
- org.apache.hadoop.util.StringUtils.stringifyException(e));
- ret = 1;
- }
-
- } else if (tokens[0].toLowerCase().equals("list")) {
-
- SessionState.ResourceType t;
- if (tokens.length < 2 || (t = SessionState.find_resource_type(tokens[1])) == null) {
- console.printError("Usage: list ["
- + StringUtils.join(SessionState.ResourceType.values(), "|") + "] [<value> [<value>]*]");
- ret = 1;
- } else {
- List<String> filter = null;
- if (tokens.length >= 3) {
- System.arraycopy(tokens, 2, tokens, 0, tokens.length - 2);
- filter = Arrays.asList(tokens);
- }
- Set<String> s = ss.list_resource(t, filter);
- if (s != null && !s.isEmpty()) {
- ss.out.println(StringUtils.join(s, "\n"));
- }
- }
- } else if (ss.isRemoteMode()) {
- HiveClient client = ss.getClient();
- PrintStream out = ss.out;
- PrintStream err = ss.err;
-
- try {
- client.execute(cmd_trimmed);
- List<String> results;
- do {
- results = client.fetchN(LINES_TO_FETCH);
- for (String line : results) {
- out.println(line);
- }
- } while (results.size() == LINES_TO_FETCH);
- } catch (HiveServerException e) {
- ret = e.getErrorCode();
- if (ret != 0) {
- String errMsg = e.getMessage();
- if (errMsg == null) {
- errMsg = e.toString();
- }
- ret = e.getErrorCode();
- err.println("[Hive Error]: " + errMsg);
- }
- } catch (TException e) {
- String errMsg = e.getMessage();
- if (errMsg == null) {
- errMsg = e.toString();
- }
- ret = -10002;
- err.println("[Thrift Error]: " + errMsg);
- } finally {
- try {
- client.clean();
- } catch (TException e) {
- String errMsg = e.getMessage();
- if (errMsg == null) {
- errMsg = e.toString();
- }
- err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: "
- + errMsg);
- }
- }
- } else {
- CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf) conf);
- ret = processLocalCmd(cmd, proc, ss);
- }
-
- return ret;
- }
- <span style="font-family:Arial, Helvetica, sans-serif;"><span style="white-space: normal;">
- </span></span>
- <span style="font-family:Arial, Helvetica, sans-serif;"><span style="white-space: normal;">
- </span></span>
5,processLoacalCmd
- int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
- int tryCount = 0;
- boolean needRetry;
- int ret = 0;
-
- do {
- try {
- needRetry = false;
- if (proc != null) {
- if (proc instanceof Driver) {
- Driver qp = (Driver) proc;
- PrintStream out = ss.out;
- long start = System.currentTimeMillis();
- if (ss.getIsVerbose()) {
- out.println(cmd);
- }
-
- qp.setTryCount(tryCount);
- ret = qp.run(cmd).getResponseCode();
- if (ret != 0) {
- qp.close();
- return ret;
- }
-
- ArrayList<String> res = new ArrayList<String>();
-
- printHeader(qp, out);
-
- try {
- while (qp.getResults(res)) {
- for (String r : res) {
- out.println(r);
- }
- res.clear();
- if (out.checkError()) {
- break;
- }
- }
- } catch (IOException e) {
- console.printError("Failed with exception " + e.getClass().getName() + ":"
- + e.getMessage(), "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
- ret = 1;
- }
-
- int cret = qp.close();
- if (ret == 0) {
- ret = cret;
- }
-
- long end = System.currentTimeMillis();
- if (end > start) {
- double timeTaken = (end - start) / 1000.0;
- console.printInfo("Time taken: " + timeTaken + " seconds", null);
- }
-
- } else {
- String firstToken = tokenizeCmd(cmd.trim())[0];
- String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());
-
- if (ss.getIsVerbose()) {
- ss.out.println(firstToken + " " + cmd_1);
- }
- ret = proc.run(cmd_1).getResponseCode();
- }
- }
- } catch (CommandNeedRetryException e) {
- console.printInfo("Retry query with a different approach...");
- tryCount++;
- needRetry = true;
- }
- } while (needRetry);
-
- return ret;
- }
6,Driver 类 的run 方法。
Driver
Driver.run(String command) // 处理一条命令
{
int ret =compile(command); // 分析命令,生成Task。
ret = execute(); // 运行Task。
}
Driver.compile
Driver.compile(String command) // 处理一条命令
{
(1) Parser(antlr):HiveQL->Abstract?Syntax?Tree?(AST)
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(command, ctx);
(2) SemanticAnalyzer
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
// Do semantic analysis and plan generation
sem.analyze(tree, ctx);
}
7,plan生成位置
可以通过跟踪到Driver.java文件的Line 663可知其路径为:
/tmp/hive-gexing111/hive_2012-07-09_10-37-27_511_5073252372102100766/test2.py
如果系统自己的plan:
/tmp/hive-gexing111/hive_2012-07-09_12-45-55_479_6444298560478274273/-local-10002/plan.xml
8,Debug show tables "ge*";
在hive/ metastore/src/java /com.aliyun.apsara.odps.metastore.ots /OTSObjectStore.java
中的Gettables;
9,配置文件位置
hive/build/dist/conf/hive-site.xml
设置为改写版本:
<property>
<name>com.aliyun.odps.mode</name>
<value>true</value>
</property>
<property>
<name>native.hive.mode</name>
<value>false</value>
</property>
hive执行流程分析
标签:
原文地址:http://www.cnblogs.com/cxzdy/p/4472207.html