Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli.session;
0020 
0021 import java.io.BufferedReader;
0022 import java.io.File;
0023 import java.io.FileInputStream;
0024 import java.io.IOException;
0025 import java.io.InputStreamReader;
0026 import java.util.HashSet;
0027 import java.util.List;
0028 import java.util.Map;
0029 import java.util.Set;
0030 
0031 import org.apache.commons.io.FileUtils;
0032 import org.apache.commons.lang3.StringUtils;
0033 import org.apache.hadoop.hive.common.cli.HiveFileProcessor;
0034 import org.apache.hadoop.hive.common.cli.IHiveFileProcessor;
0035 import org.apache.hadoop.hive.conf.HiveConf;
0036 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0037 import org.apache.hadoop.hive.conf.VariableSubstitution;
0038 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
0039 import org.apache.hadoop.hive.metastore.api.MetaException;
0040 import org.apache.hadoop.hive.ql.exec.Utilities;
0041 import org.apache.hadoop.hive.ql.history.HiveHistory;
0042 import org.apache.hadoop.hive.ql.metadata.Hive;
0043 import org.apache.hadoop.hive.ql.metadata.HiveException;
0044 import org.apache.hadoop.hive.ql.session.SessionState;
0045 import org.apache.hadoop.hive.serde2.SerDeUtils;
0046 import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
0047 import org.apache.hadoop.hive.shims.ShimLoader;
0048 import org.apache.hive.common.util.HiveVersionInfo;
0049 import org.apache.hive.service.auth.HiveAuthFactory;
0050 import org.apache.hive.service.cli.FetchOrientation;
0051 import org.apache.hive.service.cli.FetchType;
0052 import org.apache.hive.service.cli.GetInfoType;
0053 import org.apache.hive.service.cli.GetInfoValue;
0054 import org.apache.hive.service.cli.HiveSQLException;
0055 import org.apache.hive.service.cli.OperationHandle;
0056 import org.apache.hive.service.cli.RowSet;
0057 import org.apache.hive.service.cli.SessionHandle;
0058 import org.apache.hive.service.cli.TableSchema;
0059 import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
0060 import org.apache.hive.service.cli.operation.GetCatalogsOperation;
0061 import org.apache.hive.service.cli.operation.GetColumnsOperation;
0062 import org.apache.hive.service.cli.operation.GetCrossReferenceOperation;
0063 import org.apache.hive.service.cli.operation.GetFunctionsOperation;
0064 import org.apache.hive.service.cli.operation.GetPrimaryKeysOperation;
0065 import org.apache.hive.service.cli.operation.GetSchemasOperation;
0066 import org.apache.hive.service.cli.operation.GetTableTypesOperation;
0067 import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
0068 import org.apache.hive.service.cli.operation.MetadataOperation;
0069 import org.apache.hive.service.cli.operation.Operation;
0070 import org.apache.hive.service.cli.operation.OperationManager;
0071 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
0072 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
0073 import org.slf4j.Logger;
0074 import org.slf4j.LoggerFactory;
0075 
0076 import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX;
0077 import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX;
0078 import static org.apache.hadoop.hive.conf.SystemVariables.HIVEVAR_PREFIX;
0079 import static org.apache.hadoop.hive.conf.SystemVariables.METACONF_PREFIX;
0080 import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX;
0081 
0082 /**
0083  * HiveSession
0084  *
0085  */
0086 public class HiveSessionImpl implements HiveSession {
0087   private final SessionHandle sessionHandle;
0088   private String username;
0089   private final String password;
0090   private HiveConf hiveConf;
0091   private SessionState sessionState;
0092   private String ipAddress;
0093   private static final String FETCH_WORK_SERDE_CLASS =
0094       "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
0095   private static final Logger LOG = LoggerFactory.getLogger(HiveSessionImpl.class);
0096   private SessionManager sessionManager;
0097   private OperationManager operationManager;
0098   private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
0099   private boolean isOperationLogEnabled;
0100   private File sessionLogDir;
0101   private volatile long lastAccessTime;
0102   private volatile long lastIdleTime;
0103 
0104   public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
0105       HiveConf serverhiveConf, String ipAddress) {
0106     this.username = username;
0107     this.password = password;
0108     this.sessionHandle = new SessionHandle(protocol);
0109     this.hiveConf = new HiveConf(serverhiveConf);
0110     this.ipAddress = ipAddress;
0111 
0112     try {
0113       // In non-impersonation mode, map scheduler queue to current user
0114       // if fair scheduler is configured.
0115       if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
0116         hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) {
0117         ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username);
0118       }
0119     } catch (IOException e) {
0120       LOG.warn("Error setting scheduler queue: " + e, e);
0121     }
0122     // Set an explicit session name to control the download directory name
0123     hiveConf.set(ConfVars.HIVESESSIONID.varname,
0124         sessionHandle.getHandleIdentifier().toString());
0125     // Use thrift transportable formatter
0126     hiveConf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, ThriftFormatter.class.getName());
0127     hiveConf.setInt(SerDeUtils.LIST_SINK_OUTPUT_PROTOCOL, protocol.getValue());
0128   }
0129 
0130   @Override
0131   /**
0132    * Opens a new HiveServer2 session for the client connection.
0133    * Creates a new SessionState object that will be associated with this HiveServer2 session.
0134    * When the server executes multiple queries in the same session,
0135    * this SessionState object is reused across multiple queries.
0136    * Note that if doAs is true, this call goes through a proxy object,
0137    * which wraps the method logic in a UserGroupInformation#doAs.
0138    * That's why it is important to create SessionState here rather than in the constructor.
0139    */
0140   public void open(Map<String, String> sessionConfMap) throws HiveSQLException {
0141     sessionState = new SessionState(hiveConf, username);
0142     sessionState.setUserIpAddress(ipAddress);
0143     sessionState.setIsHiveServerQuery(true);
0144     SessionState.start(sessionState);
0145     try {
0146       sessionState.loadAuxJars();
0147       sessionState.loadReloadableAuxJars();
0148     } catch (IOException e) {
0149       String msg = "Failed to load reloadable jar file path: " + e;
0150       LOG.error(msg, e);
0151       throw new HiveSQLException(msg, e);
0152     }
0153     // Process global init file: .hiverc
0154     processGlobalInitFile();
0155     if (sessionConfMap != null) {
0156       configureSession(sessionConfMap);
0157     }
0158     lastAccessTime = System.currentTimeMillis();
0159     lastIdleTime = lastAccessTime;
0160   }
0161 
0162   /**
0163    * It is used for processing hiverc file from HiveServer2 side.
0164    */
0165   private class GlobalHivercFileProcessor extends HiveFileProcessor {
0166     @Override
0167     protected BufferedReader loadFile(String fileName) throws IOException {
0168       FileInputStream initStream = null;
0169       BufferedReader bufferedReader = null;
0170       initStream = new FileInputStream(fileName);
0171       bufferedReader = new BufferedReader(new InputStreamReader(initStream));
0172       return bufferedReader;
0173     }
0174 
0175     @Override
0176     protected int processCmd(String cmd) {
0177       int rc = 0;
0178       String cmd_trimed = cmd.trim();
0179       try {
0180         executeStatementInternal(cmd_trimed, null, false, 0);
0181       } catch (HiveSQLException e) {
0182         rc = -1;
0183         LOG.warn("Failed to execute HQL command in global .hiverc file.", e);
0184       }
0185       return rc;
0186     }
0187   }
0188 
0189   private void processGlobalInitFile() {
0190     IHiveFileProcessor processor = new GlobalHivercFileProcessor();
0191 
0192     try {
0193       String hiverc = hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION);
0194       if (hiverc != null) {
0195         File hivercFile = new File(hiverc);
0196         if (hivercFile.isDirectory()) {
0197           hivercFile = new File(hivercFile, SessionManager.HIVERCFILE);
0198         }
0199         if (hivercFile.isFile()) {
0200           LOG.info("Running global init file: " + hivercFile);
0201           int rc = processor.processFile(hivercFile.getAbsolutePath());
0202           if (rc != 0) {
0203             LOG.error("Failed on initializing global .hiverc file");
0204           }
0205         } else {
0206           LOG.debug("Global init file " + hivercFile + " does not exist");
0207         }
0208       }
0209     } catch (IOException e) {
0210       LOG.warn("Failed on initializing global .hiverc file", e);
0211     }
0212   }
0213 
0214   private void configureSession(Map<String, String> sessionConfMap) throws HiveSQLException {
0215     SessionState.setCurrentSessionState(sessionState);
0216     for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) {
0217       String key = entry.getKey();
0218       if (key.startsWith("set:")) {
0219         try {
0220           setVariable(key.substring(4), entry.getValue());
0221         } catch (Exception e) {
0222           throw new HiveSQLException(e);
0223         }
0224       } else if (key.startsWith("use:")) {
0225         SessionState.get().setCurrentDatabase(entry.getValue());
0226       } else {
0227         hiveConf.verifyAndSet(key, entry.getValue());
0228       }
0229     }
0230   }
0231 
0232   // Copy from org.apache.hadoop.hive.ql.processors.SetProcessor, only change:
0233   // setConf(varname, propName, varvalue, true) when varname.startsWith(HIVECONF_PREFIX)
0234   public static int setVariable(String varname, String varvalue) throws Exception {
0235     SessionState ss = SessionState.get();
0236     VariableSubstitution substitution = new VariableSubstitution(() -> ss.getHiveVariables());
0237     if (varvalue.contains("\n")){
0238       ss.err.println("Warning: Value had a \\n character in it.");
0239     }
0240     varname = varname.trim();
0241     if (varname.startsWith(ENV_PREFIX)){
0242       ss.err.println("env:* variables can not be set.");
0243       return 1;
0244     } else if (varname.startsWith(SYSTEM_PREFIX)){
0245       String propName = varname.substring(SYSTEM_PREFIX.length());
0246       System.getProperties().setProperty(propName, substitution.substitute(ss.getConf(),varvalue));
0247     } else if (varname.startsWith(HIVECONF_PREFIX)){
0248       String propName = varname.substring(HIVECONF_PREFIX.length());
0249       setConf(varname, propName, varvalue, true);
0250     } else if (varname.startsWith(HIVEVAR_PREFIX)) {
0251       String propName = varname.substring(HIVEVAR_PREFIX.length());
0252       ss.getHiveVariables().put(propName, substitution.substitute(ss.getConf(),varvalue));
0253     } else if (varname.startsWith(METACONF_PREFIX)) {
0254       String propName = varname.substring(METACONF_PREFIX.length());
0255       Hive hive = Hive.get(ss.getConf());
0256       hive.setMetaConf(propName, substitution.substitute(ss.getConf(), varvalue));
0257     } else {
0258       setConf(varname, varname, varvalue, true);
0259     }
0260     return 0;
0261   }
0262 
0263   // returns non-null string for validation fail
0264   private static void setConf(String varname, String key, String varvalue, boolean register)
0265           throws IllegalArgumentException {
0266     VariableSubstitution substitution =
0267         new VariableSubstitution(() -> SessionState.get().getHiveVariables());
0268     HiveConf conf = SessionState.get().getConf();
0269     String value = substitution.substitute(conf, varvalue);
0270     if (conf.getBoolVar(HiveConf.ConfVars.HIVECONFVALIDATION)) {
0271       HiveConf.ConfVars confVars = HiveConf.getConfVars(key);
0272       if (confVars != null) {
0273         if (!confVars.isType(value)) {
0274           StringBuilder message = new StringBuilder();
0275           message.append("'SET ").append(varname).append('=').append(varvalue);
0276           message.append("' FAILED because ").append(key).append(" expects ");
0277           message.append(confVars.typeString()).append(" type value.");
0278           throw new IllegalArgumentException(message.toString());
0279         }
0280         String fail = confVars.validate(value);
0281         if (fail != null) {
0282           StringBuilder message = new StringBuilder();
0283           message.append("'SET ").append(varname).append('=').append(varvalue);
0284           message.append("' FAILED in validation : ").append(fail).append('.');
0285           throw new IllegalArgumentException(message.toString());
0286         }
0287       } else if (key.startsWith("hive.")) {
0288         throw new IllegalArgumentException("hive configuration " + key + " does not exists.");
0289       }
0290     }
0291     conf.verifyAndSet(key, value);
0292     if (register) {
0293       SessionState.get().getOverriddenConfigurations().put(key, value);
0294     }
0295   }
0296 
0297   @Override
0298   public void setOperationLogSessionDir(File operationLogRootDir) {
0299     if (!operationLogRootDir.exists()) {
0300       LOG.warn("The operation log root directory is removed, recreating: " +
0301           operationLogRootDir.getAbsolutePath());
0302       if (!operationLogRootDir.mkdirs()) {
0303         LOG.warn("Unable to create operation log root directory: " +
0304             operationLogRootDir.getAbsolutePath());
0305       }
0306     }
0307     if (!operationLogRootDir.canWrite()) {
0308       LOG.warn("The operation log root directory is not writable: " +
0309           operationLogRootDir.getAbsolutePath());
0310     }
0311     sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier().toString());
0312     isOperationLogEnabled = true;
0313     if (!sessionLogDir.exists()) {
0314       if (!sessionLogDir.mkdir()) {
0315         LOG.warn("Unable to create operation log session directory: " +
0316             sessionLogDir.getAbsolutePath());
0317         isOperationLogEnabled = false;
0318       }
0319     }
0320     if (isOperationLogEnabled) {
0321       LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath());
0322     }
0323   }
0324 
0325   @Override
0326   public boolean isOperationLogEnabled() {
0327     return isOperationLogEnabled;
0328   }
0329 
0330   @Override
0331   public File getOperationLogSessionDir() {
0332     return sessionLogDir;
0333   }
0334 
0335   @Override
0336   public TProtocolVersion getProtocolVersion() {
0337     return sessionHandle.getProtocolVersion();
0338   }
0339 
0340   @Override
0341   public SessionManager getSessionManager() {
0342     return sessionManager;
0343   }
0344 
0345   @Override
0346   public void setSessionManager(SessionManager sessionManager) {
0347     this.sessionManager = sessionManager;
0348   }
0349 
0350   private OperationManager getOperationManager() {
0351     return operationManager;
0352   }
0353 
0354   @Override
0355   public void setOperationManager(OperationManager operationManager) {
0356     this.operationManager = operationManager;
0357   }
0358 
0359   protected synchronized void acquire(boolean userAccess) {
0360     // Need to make sure that the this HiveServer2's session's SessionState is
0361     // stored in the thread local for the handler thread.
0362     SessionState.setCurrentSessionState(sessionState);
0363     if (userAccess) {
0364       lastAccessTime = System.currentTimeMillis();
0365     }
0366   }
0367 
0368   /**
0369    * 1. We'll remove the ThreadLocal SessionState as this thread might now serve
0370    * other requests.
0371    * 2. We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
0372    * when this thread is garbage collected later.
0373    * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
0374    */
0375   protected synchronized void release(boolean userAccess) {
0376     SessionState.detachSession();
0377     if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
0378       ThreadWithGarbageCleanup currentThread =
0379           (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
0380       currentThread.cacheThreadLocalRawStore();
0381     }
0382     if (userAccess) {
0383       lastAccessTime = System.currentTimeMillis();
0384     }
0385     if (opHandleSet.isEmpty()) {
0386       lastIdleTime = System.currentTimeMillis();
0387     } else {
0388       lastIdleTime = 0;
0389     }
0390   }
0391 
0392   @Override
0393   public SessionHandle getSessionHandle() {
0394     return sessionHandle;
0395   }
0396 
0397   @Override
0398   public String getUsername() {
0399     return username;
0400   }
0401 
0402   @Override
0403   public String getPassword() {
0404     return password;
0405   }
0406 
0407   @Override
0408   public HiveConf getHiveConf() {
0409     hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE, FETCH_WORK_SERDE_CLASS);
0410     return hiveConf;
0411   }
0412 
0413   @Override
0414   public IMetaStoreClient getMetaStoreClient() throws HiveSQLException {
0415     try {
0416       return Hive.get(getHiveConf()).getMSC();
0417     } catch (HiveException e) {
0418       throw new HiveSQLException("Failed to get metastore connection", e);
0419     } catch (MetaException e) {
0420       throw new HiveSQLException("Failed to get metastore connection", e);
0421     }
0422   }
0423 
0424   @Override
0425   public GetInfoValue getInfo(GetInfoType getInfoType)
0426       throws HiveSQLException {
0427     acquire(true);
0428     try {
0429       switch (getInfoType) {
0430       case CLI_SERVER_NAME:
0431         return new GetInfoValue("Hive");
0432       case CLI_DBMS_NAME:
0433         return new GetInfoValue("Apache Hive");
0434       case CLI_DBMS_VER:
0435         return new GetInfoValue(HiveVersionInfo.getVersion());
0436       case CLI_MAX_COLUMN_NAME_LEN:
0437         return new GetInfoValue(128);
0438       case CLI_MAX_SCHEMA_NAME_LEN:
0439         return new GetInfoValue(128);
0440       case CLI_MAX_TABLE_NAME_LEN:
0441         return new GetInfoValue(128);
0442       case CLI_TXN_CAPABLE:
0443       default:
0444         throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
0445       }
0446     } finally {
0447       release(true);
0448     }
0449   }
0450 
0451   @Override
0452   public OperationHandle executeStatement(String statement, Map<String, String> confOverlay)
0453       throws HiveSQLException {
0454     return executeStatementInternal(statement, confOverlay, false, 0);
0455   }
0456 
0457   @Override
0458   public OperationHandle executeStatement(String statement, Map<String, String> confOverlay,
0459       long queryTimeout) throws HiveSQLException {
0460     return executeStatementInternal(statement, confOverlay, false, queryTimeout);
0461   }
0462 
0463   @Override
0464   public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay)
0465       throws HiveSQLException {
0466     return executeStatementInternal(statement, confOverlay, true, 0);
0467   }
0468 
0469   @Override
0470   public OperationHandle executeStatementAsync(String statement, Map<String, String> confOverlay,
0471       long queryTimeout) throws HiveSQLException {
0472     return executeStatementInternal(statement, confOverlay, true, queryTimeout);
0473   }
0474 
0475   private OperationHandle executeStatementInternal(String statement,
0476       Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
0477     acquire(true);
0478 
0479     OperationManager operationManager = getOperationManager();
0480     ExecuteStatementOperation operation = operationManager
0481         .newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync, queryTimeout);
0482     OperationHandle opHandle = operation.getHandle();
0483     try {
0484       operation.run();
0485       opHandleSet.add(opHandle);
0486       return opHandle;
0487     } catch (HiveSQLException e) {
0488       // Referring to SQLOperation.java, there is no chance that a HiveSQLException throws and the asyn
0489       // background operation submits to thread pool successfully at the same time. So, Cleanup
0490       // opHandle directly when got HiveSQLException
0491       operationManager.closeOperation(opHandle);
0492       throw e;
0493     } finally {
0494       release(true);
0495     }
0496   }
0497 
0498   @Override
0499   public OperationHandle getTypeInfo()
0500       throws HiveSQLException {
0501     acquire(true);
0502 
0503     OperationManager operationManager = getOperationManager();
0504     GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
0505     OperationHandle opHandle = operation.getHandle();
0506     try {
0507       operation.run();
0508       opHandleSet.add(opHandle);
0509       return opHandle;
0510     } catch (HiveSQLException e) {
0511       operationManager.closeOperation(opHandle);
0512       throw e;
0513     } finally {
0514       release(true);
0515     }
0516   }
0517 
0518   @Override
0519   public OperationHandle getCatalogs()
0520       throws HiveSQLException {
0521     acquire(true);
0522 
0523     OperationManager operationManager = getOperationManager();
0524     GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
0525     OperationHandle opHandle = operation.getHandle();
0526     try {
0527       operation.run();
0528       opHandleSet.add(opHandle);
0529       return opHandle;
0530     } catch (HiveSQLException e) {
0531       operationManager.closeOperation(opHandle);
0532       throw e;
0533     } finally {
0534       release(true);
0535     }
0536   }
0537 
0538   @Override
0539   public OperationHandle getSchemas(String catalogName, String schemaName)
0540       throws HiveSQLException {
0541     acquire(true);
0542 
0543     OperationManager operationManager = getOperationManager();
0544     GetSchemasOperation operation =
0545         operationManager.newGetSchemasOperation(getSession(), catalogName, schemaName);
0546     OperationHandle opHandle = operation.getHandle();
0547     try {
0548       operation.run();
0549       opHandleSet.add(opHandle);
0550       return opHandle;
0551     } catch (HiveSQLException e) {
0552       operationManager.closeOperation(opHandle);
0553       throw e;
0554     } finally {
0555       release(true);
0556     }
0557   }
0558 
0559   @Override
0560   public OperationHandle getTables(String catalogName, String schemaName, String tableName,
0561       List<String> tableTypes)
0562           throws HiveSQLException {
0563     acquire(true);
0564 
0565     OperationManager operationManager = getOperationManager();
0566     MetadataOperation operation =
0567         operationManager.newGetTablesOperation(getSession(), catalogName, schemaName, tableName, tableTypes);
0568     OperationHandle opHandle = operation.getHandle();
0569     try {
0570       operation.run();
0571       opHandleSet.add(opHandle);
0572       return opHandle;
0573     } catch (HiveSQLException e) {
0574       operationManager.closeOperation(opHandle);
0575       throw e;
0576     } finally {
0577       release(true);
0578     }
0579   }
0580 
0581   @Override
0582   public OperationHandle getTableTypes()
0583       throws HiveSQLException {
0584     acquire(true);
0585 
0586     OperationManager operationManager = getOperationManager();
0587     GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
0588     OperationHandle opHandle = operation.getHandle();
0589     try {
0590       operation.run();
0591       opHandleSet.add(opHandle);
0592       return opHandle;
0593     } catch (HiveSQLException e) {
0594       operationManager.closeOperation(opHandle);
0595       throw e;
0596     } finally {
0597       release(true);
0598     }
0599   }
0600 
0601   @Override
0602   public OperationHandle getColumns(String catalogName, String schemaName,
0603       String tableName, String columnName)  throws HiveSQLException {
0604     acquire(true);
0605     String addedJars = Utilities.getResourceFiles(hiveConf, SessionState.ResourceType.JAR);
0606     if (StringUtils.isNotBlank(addedJars)) {
0607        IMetaStoreClient metastoreClient = getSession().getMetaStoreClient();
0608        metastoreClient.setHiveAddedJars(addedJars);
0609     }
0610     OperationManager operationManager = getOperationManager();
0611     GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(),
0612         catalogName, schemaName, tableName, columnName);
0613     OperationHandle opHandle = operation.getHandle();
0614     try {
0615       operation.run();
0616       opHandleSet.add(opHandle);
0617       return opHandle;
0618     } catch (HiveSQLException e) {
0619       operationManager.closeOperation(opHandle);
0620       throw e;
0621     } finally {
0622       release(true);
0623     }
0624   }
0625 
0626   @Override
0627   public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
0628       throws HiveSQLException {
0629     acquire(true);
0630 
0631     OperationManager operationManager = getOperationManager();
0632     GetFunctionsOperation operation = operationManager
0633         .newGetFunctionsOperation(getSession(), catalogName, schemaName, functionName);
0634     OperationHandle opHandle = operation.getHandle();
0635     try {
0636       operation.run();
0637       opHandleSet.add(opHandle);
0638       return opHandle;
0639     } catch (HiveSQLException e) {
0640       operationManager.closeOperation(opHandle);
0641       throw e;
0642     } finally {
0643       release(true);
0644     }
0645   }
0646 
0647   @Override
0648   public void close() throws HiveSQLException {
0649     try {
0650       acquire(true);
0651       // Iterate through the opHandles and close their operations
0652       for (OperationHandle opHandle : opHandleSet) {
0653         operationManager.closeOperation(opHandle);
0654       }
0655       opHandleSet.clear();
0656       // Cleanup session log directory.
0657       cleanupSessionLogDir();
0658       // Cleanup pipeout file.
0659       cleanupPipeoutFile();
0660       HiveHistory hiveHist = sessionState.getHiveHistory();
0661       if (null != hiveHist) {
0662         hiveHist.closeStream();
0663       }
0664       try {
0665         sessionState.close();
0666       } finally {
0667         sessionState = null;
0668       }
0669     } catch (IOException ioe) {
0670       throw new HiveSQLException("Failure to close", ioe);
0671     } finally {
0672       if (sessionState != null) {
0673         try {
0674           sessionState.close();
0675         } catch (Throwable t) {
0676           LOG.warn("Error closing session", t);
0677         }
0678         sessionState = null;
0679       }
0680       release(true);
0681     }
0682   }
0683 
0684   private void cleanupPipeoutFile() {
0685     String lScratchDir = hiveConf.getVar(ConfVars.LOCALSCRATCHDIR);
0686     String sessionID = hiveConf.getVar(ConfVars.HIVESESSIONID);
0687 
0688     File[] fileAry = new File(lScratchDir).listFiles(
0689             (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));
0690 
0691     for (File file : fileAry) {
0692       try {
0693         FileUtils.forceDelete(file);
0694       } catch (Exception e) {
0695         LOG.error("Failed to cleanup pipeout file: " + file, e);
0696       }
0697     }
0698   }
0699 
0700   private void cleanupSessionLogDir() {
0701     if (isOperationLogEnabled) {
0702       try {
0703         FileUtils.forceDelete(sessionLogDir);
0704       } catch (Exception e) {
0705         LOG.error("Failed to cleanup session log dir: " + sessionHandle, e);
0706       }
0707     }
0708   }
0709 
0710   @Override
0711   public SessionState getSessionState() {
0712     return sessionState;
0713   }
0714 
0715   @Override
0716   public String getUserName() {
0717     return username;
0718   }
0719 
0720   @Override
0721   public void setUserName(String userName) {
0722     this.username = userName;
0723   }
0724 
0725   @Override
0726   public long getLastAccessTime() {
0727     return lastAccessTime;
0728   }
0729 
0730   @Override
0731   public void closeExpiredOperations() {
0732     OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
0733     if (handles.length > 0) {
0734       List<Operation> operations = operationManager.removeExpiredOperations(handles);
0735       if (!operations.isEmpty()) {
0736         closeTimedOutOperations(operations);
0737       }
0738     }
0739   }
0740 
0741   @Override
0742   public long getNoOperationTime() {
0743     return lastIdleTime > 0 ? System.currentTimeMillis() - lastIdleTime : 0;
0744   }
0745 
0746   private void closeTimedOutOperations(List<Operation> operations) {
0747     acquire(false);
0748     try {
0749       for (Operation operation : operations) {
0750         opHandleSet.remove(operation.getHandle());
0751         try {
0752           operation.close();
0753         } catch (Exception e) {
0754           LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e);
0755         }
0756       }
0757     } finally {
0758       release(false);
0759     }
0760   }
0761 
0762   @Override
0763   public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
0764     acquire(true);
0765     try {
0766       sessionManager.getOperationManager().cancelOperation(opHandle);
0767     } finally {
0768       release(true);
0769     }
0770   }
0771 
0772   @Override
0773   public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
0774     acquire(true);
0775     try {
0776       operationManager.closeOperation(opHandle);
0777       opHandleSet.remove(opHandle);
0778     } finally {
0779       release(true);
0780     }
0781   }
0782 
0783   @Override
0784   public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
0785     acquire(true);
0786     try {
0787       return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
0788     } finally {
0789       release(true);
0790     }
0791   }
0792 
0793   @Override
0794   public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
0795       long maxRows, FetchType fetchType) throws HiveSQLException {
0796     acquire(true);
0797     try {
0798       if (fetchType == FetchType.QUERY_OUTPUT) {
0799         return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
0800       }
0801       return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows);
0802     } finally {
0803       release(true);
0804     }
0805   }
0806 
0807   protected HiveSession getSession() {
0808     return this;
0809   }
0810 
0811   @Override
0812   public String getIpAddress() {
0813     return ipAddress;
0814   }
0815 
0816   @Override
0817   public void setIpAddress(String ipAddress) {
0818     this.ipAddress = ipAddress;
0819   }
0820 
0821   @Override
0822   public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer)
0823       throws HiveSQLException {
0824     HiveAuthFactory.verifyProxyAccess(getUsername(), owner, getIpAddress(), getHiveConf());
0825     return authFactory.getDelegationToken(owner, renewer, getIpAddress());
0826   }
0827 
0828   @Override
0829   public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
0830       throws HiveSQLException {
0831     HiveAuthFactory.verifyProxyAccess(getUsername(), getUserFromToken(authFactory, tokenStr),
0832         getIpAddress(), getHiveConf());
0833     authFactory.cancelDelegationToken(tokenStr);
0834   }
0835 
0836   @Override
0837   public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
0838       throws HiveSQLException {
0839     HiveAuthFactory.verifyProxyAccess(getUsername(), getUserFromToken(authFactory, tokenStr),
0840         getIpAddress(), getHiveConf());
0841     authFactory.renewDelegationToken(tokenStr);
0842   }
0843 
0844   // extract the real user from the given token string
0845   private String getUserFromToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException {
0846     return authFactory.getUserFromToken(tokenStr);
0847   }
0848 
0849   @Override
0850   public OperationHandle getPrimaryKeys(String catalog, String schema,
0851       String table) throws HiveSQLException {
0852     acquire(true);
0853 
0854     OperationManager operationManager = getOperationManager();
0855     GetPrimaryKeysOperation operation = operationManager
0856         .newGetPrimaryKeysOperation(getSession(), catalog, schema, table);
0857     OperationHandle opHandle = operation.getHandle();
0858     try {
0859       operation.run();
0860       opHandleSet.add(opHandle);
0861       return opHandle;
0862     } catch (HiveSQLException e) {
0863       operationManager.closeOperation(opHandle);
0864       throw e;
0865     } finally {
0866       release(true);
0867     }
0868   }
0869 
0870   @Override
0871   public OperationHandle getCrossReference(String primaryCatalog,
0872       String primarySchema, String primaryTable, String foreignCatalog,
0873       String foreignSchema, String foreignTable) throws HiveSQLException {
0874     acquire(true);
0875 
0876     OperationManager operationManager = getOperationManager();
0877     GetCrossReferenceOperation operation = operationManager
0878       .newGetCrossReferenceOperation(getSession(), primaryCatalog,
0879          primarySchema, primaryTable, foreignCatalog,
0880          foreignSchema, foreignTable);
0881     OperationHandle opHandle = operation.getHandle();
0882     try {
0883       operation.run();
0884       opHandleSet.add(opHandle);
0885       return opHandle;
0886     } catch (HiveSQLException e) {
0887       operationManager.closeOperation(opHandle);
0888       throw e;
0889     } finally {
0890       release(true);
0891     }
0892   }
0893 }