0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.hive.service.cli.session;
0020
0021 import java.io.BufferedReader;
0022 import java.io.File;
0023 import java.io.FileInputStream;
0024 import java.io.IOException;
0025 import java.io.InputStreamReader;
0026 import java.util.HashSet;
0027 import java.util.List;
0028 import java.util.Map;
0029 import java.util.Set;
0030
0031 import org.apache.commons.io.FileUtils;
0032 import org.apache.commons.lang3.StringUtils;
0033 import org.apache.hadoop.hive.common.cli.HiveFileProcessor;
0034 import org.apache.hadoop.hive.common.cli.IHiveFileProcessor;
0035 import org.apache.hadoop.hive.conf.HiveConf;
0036 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0037 import org.apache.hadoop.hive.conf.VariableSubstitution;
0038 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
0039 import org.apache.hadoop.hive.metastore.api.MetaException;
0040 import org.apache.hadoop.hive.ql.exec.Utilities;
0041 import org.apache.hadoop.hive.ql.history.HiveHistory;
0042 import org.apache.hadoop.hive.ql.metadata.Hive;
0043 import org.apache.hadoop.hive.ql.metadata.HiveException;
0044 import org.apache.hadoop.hive.ql.session.SessionState;
0045 import org.apache.hadoop.hive.serde2.SerDeUtils;
0046 import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
0047 import org.apache.hadoop.hive.shims.ShimLoader;
0048 import org.apache.hive.common.util.HiveVersionInfo;
0049 import org.apache.hive.service.auth.HiveAuthFactory;
0050 import org.apache.hive.service.cli.FetchOrientation;
0051 import org.apache.hive.service.cli.FetchType;
0052 import org.apache.hive.service.cli.GetInfoType;
0053 import org.apache.hive.service.cli.GetInfoValue;
0054 import org.apache.hive.service.cli.HiveSQLException;
0055 import org.apache.hive.service.cli.OperationHandle;
0056 import org.apache.hive.service.cli.RowSet;
0057 import org.apache.hive.service.cli.SessionHandle;
0058 import org.apache.hive.service.cli.TableSchema;
0059 import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
0060 import org.apache.hive.service.cli.operation.GetCatalogsOperation;
0061 import org.apache.hive.service.cli.operation.GetColumnsOperation;
0062 import org.apache.hive.service.cli.operation.GetCrossReferenceOperation;
0063 import org.apache.hive.service.cli.operation.GetFunctionsOperation;
0064 import org.apache.hive.service.cli.operation.GetPrimaryKeysOperation;
0065 import org.apache.hive.service.cli.operation.GetSchemasOperation;
0066 import org.apache.hive.service.cli.operation.GetTableTypesOperation;
0067 import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
0068 import org.apache.hive.service.cli.operation.MetadataOperation;
0069 import org.apache.hive.service.cli.operation.Operation;
0070 import org.apache.hive.service.cli.operation.OperationManager;
0071 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
0072 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
0073 import org.slf4j.Logger;
0074 import org.slf4j.LoggerFactory;
0075
0076 import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX;
0077 import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX;
0078 import static org.apache.hadoop.hive.conf.SystemVariables.HIVEVAR_PREFIX;
0079 import static org.apache.hadoop.hive.conf.SystemVariables.METACONF_PREFIX;
0080 import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX;
0081
0082
0083
0084
0085
0086 public class HiveSessionImpl implements HiveSession {
0087 private final SessionHandle sessionHandle;
0088 private String username;
0089 private final String password;
0090 private HiveConf hiveConf;
0091 private SessionState sessionState;
0092 private String ipAddress;
0093 private static final String FETCH_WORK_SERDE_CLASS =
0094 "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
0095 private static final Logger LOG = LoggerFactory.getLogger(HiveSessionImpl.class);
0096 private SessionManager sessionManager;
0097 private OperationManager operationManager;
0098 private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
0099 private boolean isOperationLogEnabled;
0100 private File sessionLogDir;
0101 private volatile long lastAccessTime;
0102 private volatile long lastIdleTime;
0103
0104 public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
0105 HiveConf serverhiveConf, String ipAddress) {
0106 this.username = username;
0107 this.password = password;
0108 this.sessionHandle = new SessionHandle(protocol);
0109 this.hiveConf = new HiveConf(serverhiveConf);
0110 this.ipAddress = ipAddress;
0111
0112 try {
0113
0114
0115 if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
0116 hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) {
0117 ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username);
0118 }
0119 } catch (IOException e) {
0120 LOG.warn("Error setting scheduler queue: " + e, e);
0121 }
0122
0123 hiveConf.set(ConfVars.HIVESESSIONID.varname,
0124 sessionHandle.getHandleIdentifier().toString());
0125
0126 hiveConf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, ThriftFormatter.class.getName());
0127 hiveConf.setInt(SerDeUtils.LIST_SINK_OUTPUT_PROTOCOL, protocol.getValue());
0128 }
0129
0130 @Override
0131
0132
0133
0134
0135
0136
0137
0138
0139
0140 public void open(Map<String, String> sessionConfMap) throws HiveSQLException {
0141 sessionState = new SessionState(hiveConf, username);
0142 sessionState.setUserIpAddress(ipAddress);
0143 sessionState.setIsHiveServerQuery(true);
0144 SessionState.start(sessionState);
0145 try {
0146 sessionState.loadAuxJars();
0147 sessionState.loadReloadableAuxJars();
0148 } catch (IOException e) {
0149 String msg = "Failed to load reloadable jar file path: " + e;
0150 LOG.error(msg, e);
0151 throw new HiveSQLException(msg, e);
0152 }
0153
0154 processGlobalInitFile();
0155 if (sessionConfMap != null) {
0156 configureSession(sessionConfMap);
0157 }
0158 lastAccessTime = System.currentTimeMillis();
0159 lastIdleTime = lastAccessTime;
0160 }
0161
0162
0163
0164
0165 private class GlobalHivercFileProcessor extends HiveFileProcessor {
0166 @Override
0167 protected BufferedReader loadFile(String fileName) throws IOException {
0168 FileInputStream initStream = null;
0169 BufferedReader bufferedReader = null;
0170 initStream = new FileInputStream(fileName);
0171 bufferedReader = new BufferedReader(new InputStreamReader(initStream));
0172 return bufferedReader;
0173 }
0174
0175 @Override
0176 protected int processCmd(String cmd) {
0177 int rc = 0;
0178 String cmd_trimed = cmd.trim();
0179 try {
0180 executeStatementInternal(cmd_trimed, null, false, 0);
0181 } catch (HiveSQLException e) {
0182 rc = -1;
0183 LOG.warn("Failed to execute HQL command in global .hiverc file.", e);
0184 }
0185 return rc;
0186 }
0187 }
0188
0189 private void processGlobalInitFile() {
0190 IHiveFileProcessor processor = new GlobalHivercFileProcessor();
0191
0192 try {
0193 String hiverc = hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION);
0194 if (hiverc != null) {
0195 File hivercFile = new File(hiverc);
0196 if (hivercFile.isDirectory()) {
0197 hivercFile = new File(hivercFile, SessionManager.HIVERCFILE);
0198 }
0199 if (hivercFile.isFile()) {
0200 LOG.info("Running global init file: " + hivercFile);
0201 int rc = processor.processFile(hivercFile.getAbsolutePath());
0202 if (rc != 0) {
0203 LOG.error("Failed on initializing global .hiverc file");
0204 }
0205 } else {
0206 LOG.debug("Global init file " + hivercFile + " does not exist");
0207 }
0208 }
0209 } catch (IOException e) {
0210 LOG.warn("Failed on initializing global .hiverc file", e);
0211 }
0212 }
0213
0214 private void configureSession(Map<String, String> sessionConfMap) throws HiveSQLException {
0215 SessionState.setCurrentSessionState(sessionState);
0216 for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) {
0217 String key = entry.getKey();
0218 if (key.startsWith("set:")) {
0219 try {
0220 setVariable(key.substring(4), entry.getValue());
0221 } catch (Exception e) {
0222 throw new HiveSQLException(e);
0223 }
0224 } else if (key.startsWith("use:")) {
0225 SessionState.get().setCurrentDatabase(entry.getValue());
0226 } else {
0227 hiveConf.verifyAndSet(key, entry.getValue());
0228 }
0229 }
0230 }
0231
0232
0233
0234 public static int setVariable(String varname, String varvalue) throws Exception {
0235 SessionState ss = SessionState.get();
0236 VariableSubstitution substitution = new VariableSubstitution(() -> ss.getHiveVariables());
0237 if (varvalue.contains("\n")){
0238 ss.err.println("Warning: Value had a \\n character in it.");
0239 }
0240 varname = varname.trim();
0241 if (varname.startsWith(ENV_PREFIX)){
0242 ss.err.println("env:* variables can not be set.");
0243 return 1;
0244 } else if (varname.startsWith(SYSTEM_PREFIX)){
0245 String propName = varname.substring(SYSTEM_PREFIX.length());
0246 System.getProperties().setProperty(propName, substitution.substitute(ss.getConf(),varvalue));
0247 } else if (varname.startsWith(HIVECONF_PREFIX)){
0248 String propName = varname.substring(HIVECONF_PREFIX.length());
0249 setConf(varname, propName, varvalue, true);
0250 } else if (varname.startsWith(HIVEVAR_PREFIX)) {
0251 String propName = varname.substring(HIVEVAR_PREFIX.length());
0252 ss.getHiveVariables().put(propName, substitution.substitute(ss.getConf(),varvalue));
0253 } else if (varname.startsWith(METACONF_PREFIX)) {
0254 String propName = varname.substring(METACONF_PREFIX.length());
0255 Hive hive = Hive.get(ss.getConf());
0256 hive.setMetaConf(propName, substitution.substitute(ss.getConf(), varvalue));
0257 } else {
0258 setConf(varname, varname, varvalue, true);
0259 }
0260 return 0;
0261 }
0262
0263
0264 private static void setConf(String varname, String key, String varvalue, boolean register)
0265 throws IllegalArgumentException {
0266 VariableSubstitution substitution =
0267 new VariableSubstitution(() -> SessionState.get().getHiveVariables());
0268 HiveConf conf = SessionState.get().getConf();
0269 String value = substitution.substitute(conf, varvalue);
0270 if (conf.getBoolVar(HiveConf.ConfVars.HIVECONFVALIDATION)) {
0271 HiveConf.ConfVars confVars = HiveConf.getConfVars(key);
0272 if (confVars != null) {
0273 if (!confVars.isType(value)) {
0274 StringBuilder message = new StringBuilder();
0275 message.append("'SET ").append(varname).append('=').append(varvalue);
0276 message.append("' FAILED because ").append(key).append(" expects ");
0277 message.append(confVars.typeString()).append(" type value.");
0278 throw new IllegalArgumentException(message.toString());
0279 }
0280 String fail = confVars.validate(value);
0281 if (fail != null) {
0282 StringBuilder message = new StringBuilder();
0283 message.append("'SET ").append(varname).append('=').append(varvalue);
0284 message.append("' FAILED in validation : ").append(fail).append('.');
0285 throw new IllegalArgumentException(message.toString());
0286 }
0287 } else if (key.startsWith("hive.")) {
0288 throw new IllegalArgumentException("hive configuration " + key + " does not exists.");
0289 }
0290 }
0291 conf.verifyAndSet(key, value);
0292 if (register) {
0293 SessionState.get().getOverriddenConfigurations().put(key, value);
0294 }
0295 }
0296
0297 @Override
0298 public void setOperationLogSessionDir(File operationLogRootDir) {
0299 if (!operationLogRootDir.exists()) {
0300 LOG.warn("The operation log root directory is removed, recreating: " +
0301 operationLogRootDir.getAbsolutePath());
0302 if (!operationLogRootDir.mkdirs()) {
0303 LOG.warn("Unable to create operation log root directory: " +
0304 operationLogRootDir.getAbsolutePath());
0305 }
0306 }
0307 if (!operationLogRootDir.canWrite()) {
0308 LOG.warn("The operation log root directory is not writable: " +
0309 operationLogRootDir.getAbsolutePath());
0310 }
0311 sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier().toString());
0312 isOperationLogEnabled = true;
0313 if (!sessionLogDir.exists()) {
0314 if (!sessionLogDir.mkdir()) {
0315 LOG.warn("Unable to create operation log session directory: " +
0316 sessionLogDir.getAbsolutePath());
0317 isOperationLogEnabled = false;
0318 }
0319 }
0320 if (isOperationLogEnabled) {
0321 LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath());
0322 }
0323 }
0324
0325 @Override
0326 public boolean isOperationLogEnabled() {
0327 return isOperationLogEnabled;
0328 }
0329
0330 @Override
0331 public File getOperationLogSessionDir() {
0332 return sessionLogDir;
0333 }
0334
0335 @Override
0336 public TProtocolVersion getProtocolVersion() {
0337 return sessionHandle.getProtocolVersion();
0338 }
0339
0340 @Override
0341 public SessionManager getSessionManager() {
0342 return sessionManager;
0343 }
0344
0345 @Override
0346 public void setSessionManager(SessionManager sessionManager) {
0347 this.sessionManager = sessionManager;
0348 }
0349
0350 private OperationManager getOperationManager() {
0351 return operationManager;
0352 }
0353
0354 @Override
0355 public void setOperationManager(OperationManager operationManager) {
0356 this.operationManager = operationManager;
0357 }
0358
0359 protected synchronized void acquire(boolean userAccess) {
0360
0361
0362 SessionState.setCurrentSessionState(sessionState);
0363 if (userAccess) {
0364 lastAccessTime = System.currentTimeMillis();
0365 }
0366 }
0367
0368
0369
0370
0371
0372
0373
0374
0375 protected synchronized void release(boolean userAccess) {
0376 SessionState.detachSession();
0377 if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
0378 ThreadWithGarbageCleanup currentThread =
0379 (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
0380 currentThread.cacheThreadLocalRawStore();
0381 }
0382 if (userAccess) {
0383 lastAccessTime = System.currentTimeMillis();
0384 }
0385 if (opHandleSet.isEmpty()) {
0386 lastIdleTime = System.currentTimeMillis();
0387 } else {
0388 lastIdleTime = 0;
0389 }
0390 }
0391
0392 @Override
0393 public SessionHandle getSessionHandle() {
0394 return sessionHandle;
0395 }
0396
0397 @Override
0398 public String getUsername() {
0399 return username;
0400 }
0401
0402 @Override
0403 public String getPassword() {
0404 return password;
0405 }
0406
0407 @Override
0408 public HiveConf getHiveConf() {
0409 hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE, FETCH_WORK_SERDE_CLASS);
0410 return hiveConf;
0411 }
0412
0413 @Override
0414 public IMetaStoreClient getMetaStoreClient() throws HiveSQLException {
0415 try {
0416 return Hive.get(getHiveConf()).getMSC();
0417 } catch (HiveException e) {
0418 throw new HiveSQLException("Failed to get metastore connection", e);
0419 } catch (MetaException e) {
0420 throw new HiveSQLException("Failed to get metastore connection", e);
0421 }
0422 }
0423
0424 @Override
0425 public GetInfoValue getInfo(GetInfoType getInfoType)
0426 throws HiveSQLException {
0427 acquire(true);
0428 try {
0429 switch (getInfoType) {
0430 case CLI_SERVER_NAME:
0431 return new GetInfoValue("Hive");
0432 case CLI_DBMS_NAME:
0433 return new GetInfoValue("Apache Hive");
0434 case CLI_DBMS_VER:
0435 return new GetInfoValue(HiveVersionInfo.getVersion());
0436 case CLI_MAX_COLUMN_NAME_LEN:
0437 return new GetInfoValue(128);
0438 case CLI_MAX_SCHEMA_NAME_LEN:
0439 return new GetInfoValue(128);
0440 case CLI_MAX_TABLE_NAME_LEN:
0441 return new GetInfoValue(128);
0442 case CLI_TXN_CAPABLE:
0443 default:
0444 throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
0445 }
0446 } finally {
0447 release(true);
0448 }
0449 }
0450
0451 @Override
0452 public OperationHandle executeStatement(String statement, Map<String, String> confOverlay)
0453 throws HiveSQLException {
0454 return executeStatementInternal(statement, confOverlay, false, 0);
0455 }
0456
0457 @Override
0458 public OperationHandle executeStatement(String statement, Map<String, String> confOverlay,
0459 long queryTimeout) throws HiveSQLException {
0460 return executeStatementInternal(statement, confOverlay, false, queryTimeout);
0461 }
0462
0463 @Override
0464 public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay)
0465 throws HiveSQLException {
0466 return executeStatementInternal(statement, confOverlay, true, 0);
0467 }
0468
0469 @Override
0470 public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay,
0471 long queryTimeout) throws HiveSQLException {
0472 return executeStatementInternal(statement, confOverlay, true, queryTimeout);
0473 }
0474
0475 private OperationHandle executeStatementInternal(String statement,
0476 Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
0477 acquire(true);
0478
0479 OperationManager operationManager = getOperationManager();
0480 ExecuteStatementOperation operation = operationManager
0481 .newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync, queryTimeout);
0482 OperationHandle opHandle = operation.getHandle();
0483 try {
0484 operation.run();
0485 opHandleSet.add(opHandle);
0486 return opHandle;
0487 } catch (HiveSQLException e) {
0488
0489
0490
0491 operationManager.closeOperation(opHandle);
0492 throw e;
0493 } finally {
0494 release(true);
0495 }
0496 }
0497
0498 @Override
0499 public OperationHandle getTypeInfo()
0500 throws HiveSQLException {
0501 acquire(true);
0502
0503 OperationManager operationManager = getOperationManager();
0504 GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
0505 OperationHandle opHandle = operation.getHandle();
0506 try {
0507 operation.run();
0508 opHandleSet.add(opHandle);
0509 return opHandle;
0510 } catch (HiveSQLException e) {
0511 operationManager.closeOperation(opHandle);
0512 throw e;
0513 } finally {
0514 release(true);
0515 }
0516 }
0517
0518 @Override
0519 public OperationHandle getCatalogs()
0520 throws HiveSQLException {
0521 acquire(true);
0522
0523 OperationManager operationManager = getOperationManager();
0524 GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
0525 OperationHandle opHandle = operation.getHandle();
0526 try {
0527 operation.run();
0528 opHandleSet.add(opHandle);
0529 return opHandle;
0530 } catch (HiveSQLException e) {
0531 operationManager.closeOperation(opHandle);
0532 throw e;
0533 } finally {
0534 release(true);
0535 }
0536 }
0537
0538 @Override
0539 public OperationHandle getSchemas(String catalogName, String schemaName)
0540 throws HiveSQLException {
0541 acquire(true);
0542
0543 OperationManager operationManager = getOperationManager();
0544 GetSchemasOperation operation =
0545 operationManager.newGetSchemasOperation(getSession(), catalogName, schemaName);
0546 OperationHandle opHandle = operation.getHandle();
0547 try {
0548 operation.run();
0549 opHandleSet.add(opHandle);
0550 return opHandle;
0551 } catch (HiveSQLException e) {
0552 operationManager.closeOperation(opHandle);
0553 throw e;
0554 } finally {
0555 release(true);
0556 }
0557 }
0558
0559 @Override
0560 public OperationHandle getTables(String catalogName, String schemaName, String tableName,
0561 List<String> tableTypes)
0562 throws HiveSQLException {
0563 acquire(true);
0564
0565 OperationManager operationManager = getOperationManager();
0566 MetadataOperation operation =
0567 operationManager.newGetTablesOperation(getSession(), catalogName, schemaName, tableName, tableTypes);
0568 OperationHandle opHandle = operation.getHandle();
0569 try {
0570 operation.run();
0571 opHandleSet.add(opHandle);
0572 return opHandle;
0573 } catch (HiveSQLException e) {
0574 operationManager.closeOperation(opHandle);
0575 throw e;
0576 } finally {
0577 release(true);
0578 }
0579 }
0580
0581 @Override
0582 public OperationHandle getTableTypes()
0583 throws HiveSQLException {
0584 acquire(true);
0585
0586 OperationManager operationManager = getOperationManager();
0587 GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
0588 OperationHandle opHandle = operation.getHandle();
0589 try {
0590 operation.run();
0591 opHandleSet.add(opHandle);
0592 return opHandle;
0593 } catch (HiveSQLException e) {
0594 operationManager.closeOperation(opHandle);
0595 throw e;
0596 } finally {
0597 release(true);
0598 }
0599 }
0600
0601 @Override
0602 public OperationHandle getColumns(String catalogName, String schemaName,
0603 String tableName, String columnName) throws HiveSQLException {
0604 acquire(true);
0605 String addedJars = Utilities.getResourceFiles(hiveConf, SessionState.ResourceType.JAR);
0606 if (StringUtils.isNotBlank(addedJars)) {
0607 IMetaStoreClient metastoreClient = getSession().getMetaStoreClient();
0608 metastoreClient.setHiveAddedJars(addedJars);
0609 }
0610 OperationManager operationManager = getOperationManager();
0611 GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(),
0612 catalogName, schemaName, tableName, columnName);
0613 OperationHandle opHandle = operation.getHandle();
0614 try {
0615 operation.run();
0616 opHandleSet.add(opHandle);
0617 return opHandle;
0618 } catch (HiveSQLException e) {
0619 operationManager.closeOperation(opHandle);
0620 throw e;
0621 } finally {
0622 release(true);
0623 }
0624 }
0625
0626 @Override
0627 public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
0628 throws HiveSQLException {
0629 acquire(true);
0630
0631 OperationManager operationManager = getOperationManager();
0632 GetFunctionsOperation operation = operationManager
0633 .newGetFunctionsOperation(getSession(), catalogName, schemaName, functionName);
0634 OperationHandle opHandle = operation.getHandle();
0635 try {
0636 operation.run();
0637 opHandleSet.add(opHandle);
0638 return opHandle;
0639 } catch (HiveSQLException e) {
0640 operationManager.closeOperation(opHandle);
0641 throw e;
0642 } finally {
0643 release(true);
0644 }
0645 }
0646
0647 @Override
0648 public void close() throws HiveSQLException {
0649 try {
0650 acquire(true);
0651
0652 for (OperationHandle opHandle : opHandleSet) {
0653 operationManager.closeOperation(opHandle);
0654 }
0655 opHandleSet.clear();
0656
0657 cleanupSessionLogDir();
0658
0659 cleanupPipeoutFile();
0660 HiveHistory hiveHist = sessionState.getHiveHistory();
0661 if (null != hiveHist) {
0662 hiveHist.closeStream();
0663 }
0664 try {
0665 sessionState.close();
0666 } finally {
0667 sessionState = null;
0668 }
0669 } catch (IOException ioe) {
0670 throw new HiveSQLException("Failure to close", ioe);
0671 } finally {
0672 if (sessionState != null) {
0673 try {
0674 sessionState.close();
0675 } catch (Throwable t) {
0676 LOG.warn("Error closing session", t);
0677 }
0678 sessionState = null;
0679 }
0680 release(true);
0681 }
0682 }
0683
0684 private void cleanupPipeoutFile() {
0685 String lScratchDir = hiveConf.getVar(ConfVars.LOCALSCRATCHDIR);
0686 String sessionID = hiveConf.getVar(ConfVars.HIVESESSIONID);
0687
0688 File[] fileAry = new File(lScratchDir).listFiles(
0689 (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));
0690
0691 for (File file : fileAry) {
0692 try {
0693 FileUtils.forceDelete(file);
0694 } catch (Exception e) {
0695 LOG.error("Failed to cleanup pipeout file: " + file, e);
0696 }
0697 }
0698 }
0699
0700 private void cleanupSessionLogDir() {
0701 if (isOperationLogEnabled) {
0702 try {
0703 FileUtils.forceDelete(sessionLogDir);
0704 } catch (Exception e) {
0705 LOG.error("Failed to cleanup session log dir: " + sessionHandle, e);
0706 }
0707 }
0708 }
0709
0710 @Override
0711 public SessionState getSessionState() {
0712 return sessionState;
0713 }
0714
0715 @Override
0716 public String getUserName() {
0717 return username;
0718 }
0719
0720 @Override
0721 public void setUserName(String userName) {
0722 this.username = userName;
0723 }
0724
0725 @Override
0726 public long getLastAccessTime() {
0727 return lastAccessTime;
0728 }
0729
0730 @Override
0731 public void closeExpiredOperations() {
0732 OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
0733 if (handles.length > 0) {
0734 List<Operation> operations = operationManager.removeExpiredOperations(handles);
0735 if (!operations.isEmpty()) {
0736 closeTimedOutOperations(operations);
0737 }
0738 }
0739 }
0740
0741 @Override
0742 public long getNoOperationTime() {
0743 return lastIdleTime > 0 ? System.currentTimeMillis() - lastIdleTime : 0;
0744 }
0745
0746 private void closeTimedOutOperations(List<Operation> operations) {
0747 acquire(false);
0748 try {
0749 for (Operation operation : operations) {
0750 opHandleSet.remove(operation.getHandle());
0751 try {
0752 operation.close();
0753 } catch (Exception e) {
0754 LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e);
0755 }
0756 }
0757 } finally {
0758 release(false);
0759 }
0760 }
0761
0762 @Override
0763 public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
0764 acquire(true);
0765 try {
0766 sessionManager.getOperationManager().cancelOperation(opHandle);
0767 } finally {
0768 release(true);
0769 }
0770 }
0771
0772 @Override
0773 public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
0774 acquire(true);
0775 try {
0776 operationManager.closeOperation(opHandle);
0777 opHandleSet.remove(opHandle);
0778 } finally {
0779 release(true);
0780 }
0781 }
0782
0783 @Override
0784 public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
0785 acquire(true);
0786 try {
0787 return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
0788 } finally {
0789 release(true);
0790 }
0791 }
0792
0793 @Override
0794 public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
0795 long maxRows, FetchType fetchType) throws HiveSQLException {
0796 acquire(true);
0797 try {
0798 if (fetchType == FetchType.QUERY_OUTPUT) {
0799 return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
0800 }
0801 return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows);
0802 } finally {
0803 release(true);
0804 }
0805 }
0806
0807 protected HiveSession getSession() {
0808 return this;
0809 }
0810
0811 @Override
0812 public String getIpAddress() {
0813 return ipAddress;
0814 }
0815
0816 @Override
0817 public void setIpAddress(String ipAddress) {
0818 this.ipAddress = ipAddress;
0819 }
0820
0821 @Override
0822 public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer)
0823 throws HiveSQLException {
0824 HiveAuthFactory.verifyProxyAccess(getUsername(), owner, getIpAddress(), getHiveConf());
0825 return authFactory.getDelegationToken(owner, renewer, getIpAddress());
0826 }
0827
0828 @Override
0829 public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
0830 throws HiveSQLException {
0831 HiveAuthFactory.verifyProxyAccess(getUsername(), getUserFromToken(authFactory, tokenStr),
0832 getIpAddress(), getHiveConf());
0833 authFactory.cancelDelegationToken(tokenStr);
0834 }
0835
0836 @Override
0837 public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
0838 throws HiveSQLException {
0839 HiveAuthFactory.verifyProxyAccess(getUsername(), getUserFromToken(authFactory, tokenStr),
0840 getIpAddress(), getHiveConf());
0841 authFactory.renewDelegationToken(tokenStr);
0842 }
0843
0844
0845 private String getUserFromToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException {
0846 return authFactory.getUserFromToken(tokenStr);
0847 }
0848
0849 @Override
0850 public OperationHandle getPrimaryKeys(String catalog, String schema,
0851 String table) throws HiveSQLException {
0852 acquire(true);
0853
0854 OperationManager operationManager = getOperationManager();
0855 GetPrimaryKeysOperation operation = operationManager
0856 .newGetPrimaryKeysOperation(getSession(), catalog, schema, table);
0857 OperationHandle opHandle = operation.getHandle();
0858 try {
0859 operation.run();
0860 opHandleSet.add(opHandle);
0861 return opHandle;
0862 } catch (HiveSQLException e) {
0863 operationManager.closeOperation(opHandle);
0864 throw e;
0865 } finally {
0866 release(true);
0867 }
0868 }
0869
0870 @Override
0871 public OperationHandle getCrossReference(String primaryCatalog,
0872 String primarySchema, String primaryTable, String foreignCatalog,
0873 String foreignSchema, String foreignTable) throws HiveSQLException {
0874 acquire(true);
0875
0876 OperationManager operationManager = getOperationManager();
0877 GetCrossReferenceOperation operation = operationManager
0878 .newGetCrossReferenceOperation(getSession(), primaryCatalog,
0879 primarySchema, primaryTable, foreignCatalog,
0880 foreignSchema, foreignTable);
0881 OperationHandle opHandle = operation.getHandle();
0882 try {
0883 operation.run();
0884 opHandleSet.add(opHandle);
0885 return opHandle;
0886 } catch (HiveSQLException e) {
0887 operationManager.closeOperation(opHandle);
0888 throw e;
0889 } finally {
0890 release(true);
0891 }
0892 }
0893 }