0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.hive.service.cli.session;
0020
0021 import java.io.BufferedReader;
0022 import java.io.File;
0023 import java.io.FileInputStream;
0024 import java.io.IOException;
0025 import java.io.InputStreamReader;
0026 import java.util.HashSet;
0027 import java.util.List;
0028 import java.util.Map;
0029 import java.util.Set;
0030
0031 import org.apache.commons.io.FileUtils;
0032 import org.apache.commons.lang3.StringUtils;
0033 import org.apache.commons.logging.Log;
0034 import org.apache.commons.logging.LogFactory;
0035 import org.apache.hadoop.hive.common.cli.HiveFileProcessor;
0036 import org.apache.hadoop.hive.common.cli.IHiveFileProcessor;
0037 import org.apache.hadoop.hive.conf.HiveConf;
0038 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0039 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
0040 import org.apache.hadoop.hive.metastore.api.MetaException;
0041 import org.apache.hadoop.hive.ql.exec.FetchFormatter;
0042 import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
0043 import org.apache.hadoop.hive.ql.exec.Utilities;
0044 import org.apache.hadoop.hive.ql.history.HiveHistory;
0045 import org.apache.hadoop.hive.ql.metadata.Hive;
0046 import org.apache.hadoop.hive.ql.metadata.HiveException;
0047 import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
0048 import org.apache.hadoop.hive.ql.session.SessionState;
0049 import org.apache.hadoop.hive.shims.ShimLoader;
0050 import org.apache.hive.common.util.HiveVersionInfo;
0051 import org.apache.hive.service.auth.HiveAuthFactory;
0052 import org.apache.hive.service.cli.FetchOrientation;
0053 import org.apache.hive.service.cli.FetchType;
0054 import org.apache.hive.service.cli.GetInfoType;
0055 import org.apache.hive.service.cli.GetInfoValue;
0056 import org.apache.hive.service.cli.HiveSQLException;
0057 import org.apache.hive.service.cli.OperationHandle;
0058 import org.apache.hive.service.cli.RowSet;
0059 import org.apache.hive.service.cli.SessionHandle;
0060 import org.apache.hive.service.cli.TableSchema;
0061 import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
0062 import org.apache.hive.service.cli.operation.GetCatalogsOperation;
0063 import org.apache.hive.service.cli.operation.GetColumnsOperation;
0064 import org.apache.hive.service.cli.operation.GetFunctionsOperation;
0065 import org.apache.hive.service.cli.operation.GetSchemasOperation;
0066 import org.apache.hive.service.cli.operation.GetTableTypesOperation;
0067 import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
0068 import org.apache.hive.service.cli.operation.MetadataOperation;
0069 import org.apache.hive.service.cli.operation.Operation;
0070 import org.apache.hive.service.cli.operation.OperationManager;
0071 import org.apache.hive.service.cli.thrift.TProtocolVersion;
0072 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
0073
0074 import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX;
0075 import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX;
0076 import static org.apache.hadoop.hive.conf.SystemVariables.HIVEVAR_PREFIX;
0077 import static org.apache.hadoop.hive.conf.SystemVariables.METACONF_PREFIX;
0078 import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX;
0079
0080
0081
0082
0083
0084 public class HiveSessionImpl implements HiveSession {
0085 private final SessionHandle sessionHandle;
0086 private String username;
0087 private final String password;
0088 private HiveConf hiveConf;
0089 private SessionState sessionState;
0090 private String ipAddress;
0091 private static final String FETCH_WORK_SERDE_CLASS =
0092 "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
0093 private static final Log LOG = LogFactory.getLog(HiveSessionImpl.class);
0094 private SessionManager sessionManager;
0095 private OperationManager operationManager;
0096 private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
0097 private boolean isOperationLogEnabled;
0098 private File sessionLogDir;
0099 private volatile long lastAccessTime;
0100 private volatile long lastIdleTime;
0101
0102 public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
0103 HiveConf serverhiveConf, String ipAddress) {
0104 this.username = username;
0105 this.password = password;
0106 this.sessionHandle = new SessionHandle(protocol);
0107 this.hiveConf = new HiveConf(serverhiveConf);
0108 this.ipAddress = ipAddress;
0109
0110 try {
0111
0112
0113 if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
0114 hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) {
0115 ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username);
0116 }
0117 } catch (IOException e) {
0118 LOG.warn("Error setting scheduler queue: " + e, e);
0119 }
0120
0121 hiveConf.set(ConfVars.HIVESESSIONID.varname,
0122 sessionHandle.getHandleIdentifier().toString());
0123
0124 hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER,
0125 FetchFormatter.ThriftFormatter.class.getName());
0126 hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue());
0127 }
0128
0129 @Override
0130
0131
0132
0133
0134
0135
0136
0137
0138
0139 public void open(Map<String, String> sessionConfMap) throws HiveSQLException {
0140 sessionState = new SessionState(hiveConf, username);
0141 sessionState.setUserIpAddress(ipAddress);
0142 sessionState.setIsHiveServerQuery(true);
0143 SessionState.start(sessionState);
0144 try {
0145 sessionState.reloadAuxJars();
0146 } catch (IOException e) {
0147 String msg = "Failed to load reloadable jar file path: " + e;
0148 LOG.error(msg, e);
0149 throw new HiveSQLException(msg, e);
0150 }
0151
0152 processGlobalInitFile();
0153 if (sessionConfMap != null) {
0154 configureSession(sessionConfMap);
0155 }
0156 lastAccessTime = System.currentTimeMillis();
0157 lastIdleTime = lastAccessTime;
0158 }
0159
0160
0161
0162
0163 private class GlobalHivercFileProcessor extends HiveFileProcessor {
0164 @Override
0165 protected BufferedReader loadFile(String fileName) throws IOException {
0166 FileInputStream initStream = null;
0167 BufferedReader bufferedReader = null;
0168 initStream = new FileInputStream(fileName);
0169 bufferedReader = new BufferedReader(new InputStreamReader(initStream));
0170 return bufferedReader;
0171 }
0172
0173 @Override
0174 protected int processCmd(String cmd) {
0175 int rc = 0;
0176 String cmd_trimed = cmd.trim();
0177 try {
0178 executeStatementInternal(cmd_trimed, null, false);
0179 } catch (HiveSQLException e) {
0180 rc = -1;
0181 LOG.warn("Failed to execute HQL command in global .hiverc file.", e);
0182 }
0183 return rc;
0184 }
0185 }
0186
0187 private void processGlobalInitFile() {
0188 IHiveFileProcessor processor = new GlobalHivercFileProcessor();
0189
0190 try {
0191 String hiverc = hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION);
0192 if (hiverc != null) {
0193 File hivercFile = new File(hiverc);
0194 if (hivercFile.isDirectory()) {
0195 hivercFile = new File(hivercFile, SessionManager.HIVERCFILE);
0196 }
0197 if (hivercFile.isFile()) {
0198 LOG.info("Running global init file: " + hivercFile);
0199 int rc = processor.processFile(hivercFile.getAbsolutePath());
0200 if (rc != 0) {
0201 LOG.error("Failed on initializing global .hiverc file");
0202 }
0203 } else {
0204 LOG.debug("Global init file " + hivercFile + " does not exist");
0205 }
0206 }
0207 } catch (IOException e) {
0208 LOG.warn("Failed on initializing global .hiverc file", e);
0209 }
0210 }
0211
0212 private void configureSession(Map<String, String> sessionConfMap) throws HiveSQLException {
0213 SessionState.setCurrentSessionState(sessionState);
0214 for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) {
0215 String key = entry.getKey();
0216 if (key.startsWith("set:")) {
0217 try {
0218 setVariable(key.substring(4), entry.getValue());
0219 } catch (Exception e) {
0220 throw new HiveSQLException(e);
0221 }
0222 } else if (key.startsWith("use:")) {
0223 SessionState.get().setCurrentDatabase(entry.getValue());
0224 } else {
0225 hiveConf.verifyAndSet(key, entry.getValue());
0226 }
0227 }
0228 }
0229
0230
0231
0232 public static int setVariable(String varname, String varvalue) throws Exception {
0233 SessionState ss = SessionState.get();
0234 if (varvalue.contains("\n")){
0235 ss.err.println("Warning: Value had a \\n character in it.");
0236 }
0237 varname = varname.trim();
0238 if (varname.startsWith(ENV_PREFIX)){
0239 ss.err.println("env:* variables can not be set.");
0240 return 1;
0241 } else if (varname.startsWith(SYSTEM_PREFIX)){
0242 String propName = varname.substring(SYSTEM_PREFIX.length());
0243 System.getProperties().setProperty(propName,
0244 new VariableSubstitution().substitute(ss.getConf(),varvalue));
0245 } else if (varname.startsWith(HIVECONF_PREFIX)){
0246 String propName = varname.substring(HIVECONF_PREFIX.length());
0247 setConf(varname, propName, varvalue, true);
0248 } else if (varname.startsWith(HIVEVAR_PREFIX)) {
0249 String propName = varname.substring(HIVEVAR_PREFIX.length());
0250 ss.getHiveVariables().put(propName,
0251 new VariableSubstitution().substitute(ss.getConf(),varvalue));
0252 } else if (varname.startsWith(METACONF_PREFIX)) {
0253 String propName = varname.substring(METACONF_PREFIX.length());
0254 Hive hive = Hive.get(ss.getConf());
0255 hive.setMetaConf(propName, new VariableSubstitution().substitute(ss.getConf(), varvalue));
0256 } else {
0257 setConf(varname, varname, varvalue, true);
0258 }
0259 return 0;
0260 }
0261
0262
0263 private static void setConf(String varname, String key, String varvalue, boolean register)
0264 throws IllegalArgumentException {
0265 HiveConf conf = SessionState.get().getConf();
0266 String value = new VariableSubstitution().substitute(conf, varvalue);
0267 if (conf.getBoolVar(HiveConf.ConfVars.HIVECONFVALIDATION)) {
0268 HiveConf.ConfVars confVars = HiveConf.getConfVars(key);
0269 if (confVars != null) {
0270 if (!confVars.isType(value)) {
0271 StringBuilder message = new StringBuilder();
0272 message.append("'SET ").append(varname).append('=').append(varvalue);
0273 message.append("' FAILED because ").append(key).append(" expects ");
0274 message.append(confVars.typeString()).append(" type value.");
0275 throw new IllegalArgumentException(message.toString());
0276 }
0277 String fail = confVars.validate(value);
0278 if (fail != null) {
0279 StringBuilder message = new StringBuilder();
0280 message.append("'SET ").append(varname).append('=').append(varvalue);
0281 message.append("' FAILED in validation : ").append(fail).append('.');
0282 throw new IllegalArgumentException(message.toString());
0283 }
0284 } else if (key.startsWith("hive.")) {
0285 throw new IllegalArgumentException("hive configuration " + key + " does not exists.");
0286 }
0287 }
0288 conf.verifyAndSet(key, value);
0289 if (register) {
0290 SessionState.get().getOverriddenConfigurations().put(key, value);
0291 }
0292 }
0293
0294 @Override
0295 public void setOperationLogSessionDir(File operationLogRootDir) {
0296 if (!operationLogRootDir.exists()) {
0297 LOG.warn("The operation log root directory is removed, recreating: " +
0298 operationLogRootDir.getAbsolutePath());
0299 if (!operationLogRootDir.mkdirs()) {
0300 LOG.warn("Unable to create operation log root directory: " +
0301 operationLogRootDir.getAbsolutePath());
0302 }
0303 }
0304 if (!operationLogRootDir.canWrite()) {
0305 LOG.warn("The operation log root directory is not writable: " +
0306 operationLogRootDir.getAbsolutePath());
0307 }
0308 sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier().toString());
0309 isOperationLogEnabled = true;
0310 if (!sessionLogDir.exists()) {
0311 if (!sessionLogDir.mkdir()) {
0312 LOG.warn("Unable to create operation log session directory: " +
0313 sessionLogDir.getAbsolutePath());
0314 isOperationLogEnabled = false;
0315 }
0316 }
0317 if (isOperationLogEnabled) {
0318 LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath());
0319 }
0320 }
0321
0322 @Override
0323 public boolean isOperationLogEnabled() {
0324 return isOperationLogEnabled;
0325 }
0326
0327 @Override
0328 public File getOperationLogSessionDir() {
0329 return sessionLogDir;
0330 }
0331
0332 @Override
0333 public TProtocolVersion getProtocolVersion() {
0334 return sessionHandle.getProtocolVersion();
0335 }
0336
0337 @Override
0338 public SessionManager getSessionManager() {
0339 return sessionManager;
0340 }
0341
0342 @Override
0343 public void setSessionManager(SessionManager sessionManager) {
0344 this.sessionManager = sessionManager;
0345 }
0346
0347 private OperationManager getOperationManager() {
0348 return operationManager;
0349 }
0350
0351 @Override
0352 public void setOperationManager(OperationManager operationManager) {
0353 this.operationManager = operationManager;
0354 }
0355
0356 protected synchronized void acquire(boolean userAccess) {
0357
0358
0359 SessionState.setCurrentSessionState(sessionState);
0360 if (userAccess) {
0361 lastAccessTime = System.currentTimeMillis();
0362 }
0363 }
0364
0365
0366
0367
0368
0369
0370
0371
0372 protected synchronized void release(boolean userAccess) {
0373 SessionState.detachSession();
0374 if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
0375 ThreadWithGarbageCleanup currentThread =
0376 (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
0377 currentThread.cacheThreadLocalRawStore();
0378 }
0379 if (userAccess) {
0380 lastAccessTime = System.currentTimeMillis();
0381 }
0382 if (opHandleSet.isEmpty()) {
0383 lastIdleTime = System.currentTimeMillis();
0384 } else {
0385 lastIdleTime = 0;
0386 }
0387 }
0388
0389 @Override
0390 public SessionHandle getSessionHandle() {
0391 return sessionHandle;
0392 }
0393
0394 @Override
0395 public String getUsername() {
0396 return username;
0397 }
0398
0399 @Override
0400 public String getPassword() {
0401 return password;
0402 }
0403
0404 @Override
0405 public HiveConf getHiveConf() {
0406 hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE, FETCH_WORK_SERDE_CLASS);
0407 return hiveConf;
0408 }
0409
0410 @Override
0411 public IMetaStoreClient getMetaStoreClient() throws HiveSQLException {
0412 try {
0413 return Hive.get(getHiveConf()).getMSC();
0414 } catch (HiveException e) {
0415 throw new HiveSQLException("Failed to get metastore connection", e);
0416 } catch (MetaException e) {
0417 throw new HiveSQLException("Failed to get metastore connection", e);
0418 }
0419 }
0420
0421 @Override
0422 public GetInfoValue getInfo(GetInfoType getInfoType)
0423 throws HiveSQLException {
0424 acquire(true);
0425 try {
0426 switch (getInfoType) {
0427 case CLI_SERVER_NAME:
0428 return new GetInfoValue("Hive");
0429 case CLI_DBMS_NAME:
0430 return new GetInfoValue("Apache Hive");
0431 case CLI_DBMS_VER:
0432 return new GetInfoValue(HiveVersionInfo.getVersion());
0433 case CLI_MAX_COLUMN_NAME_LEN:
0434 return new GetInfoValue(128);
0435 case CLI_MAX_SCHEMA_NAME_LEN:
0436 return new GetInfoValue(128);
0437 case CLI_MAX_TABLE_NAME_LEN:
0438 return new GetInfoValue(128);
0439 case CLI_TXN_CAPABLE:
0440 default:
0441 throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
0442 }
0443 } finally {
0444 release(true);
0445 }
0446 }
0447
0448 @Override
0449 public OperationHandle executeStatement(String statement, Map<String, String> confOverlay)
0450 throws HiveSQLException {
0451 return executeStatementInternal(statement, confOverlay, false);
0452 }
0453
0454 @Override
0455 public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay)
0456 throws HiveSQLException {
0457 return executeStatementInternal(statement, confOverlay, true);
0458 }
0459
0460 private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
0461 boolean runAsync)
0462 throws HiveSQLException {
0463 acquire(true);
0464
0465 OperationManager operationManager = getOperationManager();
0466 ExecuteStatementOperation operation = operationManager
0467 .newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync);
0468 OperationHandle opHandle = operation.getHandle();
0469 try {
0470 operation.run();
0471 opHandleSet.add(opHandle);
0472 return opHandle;
0473 } catch (HiveSQLException e) {
0474
0475
0476
0477 operationManager.closeOperation(opHandle);
0478 throw e;
0479 } finally {
0480 release(true);
0481 }
0482 }
0483
0484 @Override
0485 public OperationHandle getTypeInfo()
0486 throws HiveSQLException {
0487 acquire(true);
0488
0489 OperationManager operationManager = getOperationManager();
0490 GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
0491 OperationHandle opHandle = operation.getHandle();
0492 try {
0493 operation.run();
0494 opHandleSet.add(opHandle);
0495 return opHandle;
0496 } catch (HiveSQLException e) {
0497 operationManager.closeOperation(opHandle);
0498 throw e;
0499 } finally {
0500 release(true);
0501 }
0502 }
0503
0504 @Override
0505 public OperationHandle getCatalogs()
0506 throws HiveSQLException {
0507 acquire(true);
0508
0509 OperationManager operationManager = getOperationManager();
0510 GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
0511 OperationHandle opHandle = operation.getHandle();
0512 try {
0513 operation.run();
0514 opHandleSet.add(opHandle);
0515 return opHandle;
0516 } catch (HiveSQLException e) {
0517 operationManager.closeOperation(opHandle);
0518 throw e;
0519 } finally {
0520 release(true);
0521 }
0522 }
0523
0524 @Override
0525 public OperationHandle getSchemas(String catalogName, String schemaName)
0526 throws HiveSQLException {
0527 acquire(true);
0528
0529 OperationManager operationManager = getOperationManager();
0530 GetSchemasOperation operation =
0531 operationManager.newGetSchemasOperation(getSession(), catalogName, schemaName);
0532 OperationHandle opHandle = operation.getHandle();
0533 try {
0534 operation.run();
0535 opHandleSet.add(opHandle);
0536 return opHandle;
0537 } catch (HiveSQLException e) {
0538 operationManager.closeOperation(opHandle);
0539 throw e;
0540 } finally {
0541 release(true);
0542 }
0543 }
0544
0545 @Override
0546 public OperationHandle getTables(String catalogName, String schemaName, String tableName,
0547 List<String> tableTypes)
0548 throws HiveSQLException {
0549 acquire(true);
0550
0551 OperationManager operationManager = getOperationManager();
0552 MetadataOperation operation =
0553 operationManager.newGetTablesOperation(getSession(), catalogName, schemaName, tableName, tableTypes);
0554 OperationHandle opHandle = operation.getHandle();
0555 try {
0556 operation.run();
0557 opHandleSet.add(opHandle);
0558 return opHandle;
0559 } catch (HiveSQLException e) {
0560 operationManager.closeOperation(opHandle);
0561 throw e;
0562 } finally {
0563 release(true);
0564 }
0565 }
0566
0567 @Override
0568 public OperationHandle getTableTypes()
0569 throws HiveSQLException {
0570 acquire(true);
0571
0572 OperationManager operationManager = getOperationManager();
0573 GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
0574 OperationHandle opHandle = operation.getHandle();
0575 try {
0576 operation.run();
0577 opHandleSet.add(opHandle);
0578 return opHandle;
0579 } catch (HiveSQLException e) {
0580 operationManager.closeOperation(opHandle);
0581 throw e;
0582 } finally {
0583 release(true);
0584 }
0585 }
0586
0587 @Override
0588 public OperationHandle getColumns(String catalogName, String schemaName,
0589 String tableName, String columnName) throws HiveSQLException {
0590 acquire(true);
0591 String addedJars = Utilities.getResourceFiles(hiveConf, SessionState.ResourceType.JAR);
0592 if (StringUtils.isNotBlank(addedJars)) {
0593 IMetaStoreClient metastoreClient = getSession().getMetaStoreClient();
0594 metastoreClient.setHiveAddedJars(addedJars);
0595 }
0596 OperationManager operationManager = getOperationManager();
0597 GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(),
0598 catalogName, schemaName, tableName, columnName);
0599 OperationHandle opHandle = operation.getHandle();
0600 try {
0601 operation.run();
0602 opHandleSet.add(opHandle);
0603 return opHandle;
0604 } catch (HiveSQLException e) {
0605 operationManager.closeOperation(opHandle);
0606 throw e;
0607 } finally {
0608 release(true);
0609 }
0610 }
0611
0612 @Override
0613 public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
0614 throws HiveSQLException {
0615 acquire(true);
0616
0617 OperationManager operationManager = getOperationManager();
0618 GetFunctionsOperation operation = operationManager
0619 .newGetFunctionsOperation(getSession(), catalogName, schemaName, functionName);
0620 OperationHandle opHandle = operation.getHandle();
0621 try {
0622 operation.run();
0623 opHandleSet.add(opHandle);
0624 return opHandle;
0625 } catch (HiveSQLException e) {
0626 operationManager.closeOperation(opHandle);
0627 throw e;
0628 } finally {
0629 release(true);
0630 }
0631 }
0632
0633 @Override
0634 public void close() throws HiveSQLException {
0635 try {
0636 acquire(true);
0637
0638 for (OperationHandle opHandle : opHandleSet) {
0639 operationManager.closeOperation(opHandle);
0640 }
0641 opHandleSet.clear();
0642
0643 cleanupSessionLogDir();
0644
0645 cleanupPipeoutFile();
0646 HiveHistory hiveHist = sessionState.getHiveHistory();
0647 if (null != hiveHist) {
0648 hiveHist.closeStream();
0649 }
0650 try {
0651 sessionState.close();
0652 } finally {
0653 sessionState = null;
0654 }
0655 } catch (IOException ioe) {
0656 throw new HiveSQLException("Failure to close", ioe);
0657 } finally {
0658 if (sessionState != null) {
0659 try {
0660 sessionState.close();
0661 } catch (Throwable t) {
0662 LOG.warn("Error closing session", t);
0663 }
0664 sessionState = null;
0665 }
0666 release(true);
0667 }
0668 }
0669
0670 private void cleanupPipeoutFile() {
0671 String lScratchDir = hiveConf.getVar(ConfVars.LOCALSCRATCHDIR);
0672 String sessionID = hiveConf.getVar(ConfVars.HIVESESSIONID);
0673
0674 File[] fileAry = new File(lScratchDir).listFiles(
0675 (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));
0676
0677 for (File file : fileAry) {
0678 try {
0679 FileUtils.forceDelete(file);
0680 } catch (Exception e) {
0681 LOG.error("Failed to cleanup pipeout file: " + file, e);
0682 }
0683 }
0684 }
0685
0686 private void cleanupSessionLogDir() {
0687 if (isOperationLogEnabled) {
0688 try {
0689 FileUtils.forceDelete(sessionLogDir);
0690 } catch (Exception e) {
0691 LOG.error("Failed to cleanup session log dir: " + sessionHandle, e);
0692 }
0693 }
0694 }
0695
0696 @Override
0697 public SessionState getSessionState() {
0698 return sessionState;
0699 }
0700
0701 @Override
0702 public String getUserName() {
0703 return username;
0704 }
0705
0706 @Override
0707 public void setUserName(String userName) {
0708 this.username = userName;
0709 }
0710
0711 @Override
0712 public long getLastAccessTime() {
0713 return lastAccessTime;
0714 }
0715
0716 @Override
0717 public void closeExpiredOperations() {
0718 OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
0719 if (handles.length > 0) {
0720 List<Operation> operations = operationManager.removeExpiredOperations(handles);
0721 if (!operations.isEmpty()) {
0722 closeTimedOutOperations(operations);
0723 }
0724 }
0725 }
0726
0727 @Override
0728 public long getNoOperationTime() {
0729 return lastIdleTime > 0 ? System.currentTimeMillis() - lastIdleTime : 0;
0730 }
0731
0732 private void closeTimedOutOperations(List<Operation> operations) {
0733 acquire(false);
0734 try {
0735 for (Operation operation : operations) {
0736 opHandleSet.remove(operation.getHandle());
0737 try {
0738 operation.close();
0739 } catch (Exception e) {
0740 LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e);
0741 }
0742 }
0743 } finally {
0744 release(false);
0745 }
0746 }
0747
0748 @Override
0749 public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
0750 acquire(true);
0751 try {
0752 sessionManager.getOperationManager().cancelOperation(opHandle);
0753 } finally {
0754 release(true);
0755 }
0756 }
0757
0758 @Override
0759 public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
0760 acquire(true);
0761 try {
0762 operationManager.closeOperation(opHandle);
0763 opHandleSet.remove(opHandle);
0764 } finally {
0765 release(true);
0766 }
0767 }
0768
0769 @Override
0770 public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
0771 acquire(true);
0772 try {
0773 return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
0774 } finally {
0775 release(true);
0776 }
0777 }
0778
0779 @Override
0780 public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
0781 long maxRows, FetchType fetchType) throws HiveSQLException {
0782 acquire(true);
0783 try {
0784 if (fetchType == FetchType.QUERY_OUTPUT) {
0785 return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
0786 }
0787 return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows);
0788 } finally {
0789 release(true);
0790 }
0791 }
0792
0793 protected HiveSession getSession() {
0794 return this;
0795 }
0796
0797 @Override
0798 public String getIpAddress() {
0799 return ipAddress;
0800 }
0801
0802 @Override
0803 public void setIpAddress(String ipAddress) {
0804 this.ipAddress = ipAddress;
0805 }
0806
0807 @Override
0808 public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer)
0809 throws HiveSQLException {
0810 HiveAuthFactory.verifyProxyAccess(getUsername(), owner, getIpAddress(), getHiveConf());
0811 return authFactory.getDelegationToken(owner, renewer);
0812 }
0813
0814 @Override
0815 public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
0816 throws HiveSQLException {
0817 HiveAuthFactory.verifyProxyAccess(getUsername(), getUserFromToken(authFactory, tokenStr),
0818 getIpAddress(), getHiveConf());
0819 authFactory.cancelDelegationToken(tokenStr);
0820 }
0821
0822 @Override
0823 public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
0824 throws HiveSQLException {
0825 HiveAuthFactory.verifyProxyAccess(getUsername(), getUserFromToken(authFactory, tokenStr),
0826 getIpAddress(), getHiveConf());
0827 authFactory.renewDelegationToken(tokenStr);
0828 }
0829
0830
0831 private String getUserFromToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException {
0832 return authFactory.getUserFromToken(tokenStr);
0833 }
0834 }