Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli.session;
0020 
0021 import java.io.File;
0022 import java.io.IOException;
0023 import java.util.ArrayList;
0024 import java.util.Date;
0025 import java.util.Map;
0026 import java.util.concurrent.ConcurrentHashMap;
0027 import java.util.concurrent.Future;
0028 import java.util.concurrent.LinkedBlockingQueue;
0029 import java.util.concurrent.ThreadPoolExecutor;
0030 import java.util.concurrent.TimeUnit;
0031 
0032 import org.apache.commons.io.FileUtils;
0033 import org.apache.commons.logging.Log;
0034 import org.apache.commons.logging.LogFactory;
0035 import org.apache.hadoop.hive.conf.HiveConf;
0036 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0037 import org.apache.hive.service.CompositeService;
0038 import org.apache.hive.service.cli.HiveSQLException;
0039 import org.apache.hive.service.cli.SessionHandle;
0040 import org.apache.hive.service.cli.operation.OperationManager;
0041 import org.apache.hive.service.cli.thrift.TProtocolVersion;
0042 import org.apache.hive.service.server.HiveServer2;
0043 import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
0044 
0045 /**
0046  * SessionManager.
0047  *
0048  */
0049 public class SessionManager extends CompositeService {
0050 
0051   private static final Log LOG = LogFactory.getLog(SessionManager.class);
0052   public static final String HIVERCFILE = ".hiverc";
0053   private HiveConf hiveConf;
0054   private final Map<SessionHandle, HiveSession> handleToSession =
0055       new ConcurrentHashMap<SessionHandle, HiveSession>();
0056   private final OperationManager operationManager = new OperationManager();
0057   private ThreadPoolExecutor backgroundOperationPool;
0058   private boolean isOperationLogEnabled;
0059   private File operationLogRootDir;
0060 
0061   private long checkInterval;
0062   private long sessionTimeout;
0063   private boolean checkOperation;
0064 
0065   private volatile boolean shutdown;
0066   // The HiveServer2 instance running this service
0067   private final HiveServer2 hiveServer2;
0068 
0069   public SessionManager(HiveServer2 hiveServer2) {
0070     super(SessionManager.class.getSimpleName());
0071     this.hiveServer2 = hiveServer2;
0072   }
0073 
0074   @Override
0075   public synchronized void init(HiveConf hiveConf) {
0076     this.hiveConf = hiveConf;
0077     //Create operation log root directory, if operation logging is enabled
0078     if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
0079       initOperationLogRootDir();
0080     }
0081     createBackgroundOperationPool();
0082     addService(operationManager);
0083     super.init(hiveConf);
0084   }
0085 
0086   private void createBackgroundOperationPool() {
0087     int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
0088     LOG.info("HiveServer2: Background operation thread pool size: " + poolSize);
0089     int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
0090     LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize);
0091     long keepAliveTime = HiveConf.getTimeVar(
0092         hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS);
0093     LOG.info(
0094         "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds");
0095 
0096     // Create a thread pool with #poolSize threads
0097     // Threads terminate when they are idle for more than the keepAliveTime
0098     // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize
0099     String threadPoolName = "HiveServer2-Background-Pool";
0100     backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize,
0101         keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize),
0102         new ThreadFactoryWithGarbageCleanup(threadPoolName));
0103     backgroundOperationPool.allowCoreThreadTimeOut(true);
0104 
0105     checkInterval = HiveConf.getTimeVar(
0106         hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
0107     sessionTimeout = HiveConf.getTimeVar(
0108         hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
0109     checkOperation = HiveConf.getBoolVar(hiveConf,
0110         ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION);
0111   }
0112 
0113   private void initOperationLogRootDir() {
0114     operationLogRootDir = new File(
0115         hiveConf.getVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION));
0116     isOperationLogEnabled = true;
0117 
0118     if (operationLogRootDir.exists() && !operationLogRootDir.isDirectory()) {
0119       LOG.warn("The operation log root directory exists, but it is not a directory: " +
0120           operationLogRootDir.getAbsolutePath());
0121       isOperationLogEnabled = false;
0122     }
0123 
0124     if (!operationLogRootDir.exists()) {
0125       if (!operationLogRootDir.mkdirs()) {
0126         LOG.warn("Unable to create operation log root directory: " +
0127             operationLogRootDir.getAbsolutePath());
0128         isOperationLogEnabled = false;
0129       }
0130     }
0131 
0132     if (isOperationLogEnabled) {
0133       LOG.info("Operation log root directory is created: " + operationLogRootDir.getAbsolutePath());
0134       try {
0135         FileUtils.forceDeleteOnExit(operationLogRootDir);
0136       } catch (IOException e) {
0137         LOG.warn("Failed to schedule cleanup HS2 operation logging root dir: " +
0138             operationLogRootDir.getAbsolutePath(), e);
0139       }
0140     }
0141   }
0142 
0143   @Override
0144   public synchronized void start() {
0145     super.start();
0146     if (checkInterval > 0) {
0147       startTimeoutChecker();
0148     }
0149   }
0150 
0151   private void startTimeoutChecker() {
0152     final long interval = Math.max(checkInterval, 3000L);  // minimum 3 seconds
0153     Runnable timeoutChecker = new Runnable() {
0154       @Override
0155       public void run() {
0156         for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
0157           long current = System.currentTimeMillis();
0158           for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
0159             if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current
0160                 && (!checkOperation || session.getNoOperationTime() > sessionTimeout)) {
0161               SessionHandle handle = session.getSessionHandle();
0162               LOG.warn("Session " + handle + " is Timed-out (last access : " +
0163                   new Date(session.getLastAccessTime()) + ") and will be closed");
0164               try {
0165                 closeSession(handle);
0166               } catch (HiveSQLException e) {
0167                 LOG.warn("Exception is thrown closing session " + handle, e);
0168               }
0169             } else {
0170               session.closeExpiredOperations();
0171             }
0172           }
0173         }
0174       }
0175 
0176       private void sleepInterval(long interval) {
0177         try {
0178           Thread.sleep(interval);
0179         } catch (InterruptedException e) {
0180           // ignore
0181         }
0182       }
0183     };
0184     backgroundOperationPool.execute(timeoutChecker);
0185   }
0186 
0187   @Override
0188   public synchronized void stop() {
0189     super.stop();
0190     shutdown = true;
0191     if (backgroundOperationPool != null) {
0192       backgroundOperationPool.shutdown();
0193       long timeout = hiveConf.getTimeVar(
0194           ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
0195       try {
0196         backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
0197       } catch (InterruptedException e) {
0198         LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
0199             " seconds has been exceeded. RUNNING background operations will be shut down", e);
0200       }
0201       backgroundOperationPool = null;
0202     }
0203     cleanupLoggingRootDir();
0204   }
0205 
0206   private void cleanupLoggingRootDir() {
0207     if (isOperationLogEnabled) {
0208       try {
0209         FileUtils.forceDelete(operationLogRootDir);
0210       } catch (Exception e) {
0211         LOG.warn("Failed to cleanup root dir of HS2 logging: " + operationLogRootDir
0212             .getAbsolutePath(), e);
0213       }
0214     }
0215   }
0216 
0217   public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,
0218       Map<String, String> sessionConf) throws HiveSQLException {
0219     return openSession(protocol, username, password, ipAddress, sessionConf, false, null);
0220   }
0221 
0222   /**
0223    * Opens a new session and creates a session handle.
0224    * The username passed to this method is the effective username.
0225    * If withImpersonation is true (==doAs true) we wrap all the calls in HiveSession
0226    * within a UGI.doAs, where UGI corresponds to the effective user.
0227    *
0228    * Please see {@code org.apache.hive.service.cli.thrift.ThriftCLIService.getUserName()} for
0229    * more details.
0230    *
0231    * @param protocol
0232    * @param username
0233    * @param password
0234    * @param ipAddress
0235    * @param sessionConf
0236    * @param withImpersonation
0237    * @param delegationToken
0238    * @return
0239    * @throws HiveSQLException
0240    */
0241   public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,
0242       Map<String, String> sessionConf, boolean withImpersonation, String delegationToken)
0243           throws HiveSQLException {
0244     HiveSession session;
0245     // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl.
0246     // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs
0247     if (withImpersonation) {
0248       HiveSessionImplwithUGI sessionWithUGI = new HiveSessionImplwithUGI(protocol, username, password,
0249           hiveConf, ipAddress, delegationToken);
0250       session = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi());
0251       sessionWithUGI.setProxySession(session);
0252     } else {
0253       session = new HiveSessionImpl(protocol, username, password, hiveConf, ipAddress);
0254     }
0255     session.setSessionManager(this);
0256     session.setOperationManager(operationManager);
0257     try {
0258       session.open(sessionConf);
0259     } catch (Exception e) {
0260       try {
0261         session.close();
0262       } catch (Throwable t) {
0263         LOG.warn("Error closing session", t);
0264       }
0265       session = null;
0266       throw new HiveSQLException("Failed to open new session: " + e, e);
0267     }
0268     if (isOperationLogEnabled) {
0269       session.setOperationLogSessionDir(operationLogRootDir);
0270     }
0271     handleToSession.put(session.getSessionHandle(), session);
0272     return session.getSessionHandle();
0273   }
0274 
0275   public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
0276     HiveSession session = handleToSession.remove(sessionHandle);
0277     if (session == null) {
0278       throw new HiveSQLException("Session does not exist!");
0279     }
0280     session.close();
0281   }
0282 
0283   public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException {
0284     HiveSession session = handleToSession.get(sessionHandle);
0285     if (session == null) {
0286       throw new HiveSQLException("Invalid SessionHandle: " + sessionHandle);
0287     }
0288     return session;
0289   }
0290 
0291   public OperationManager getOperationManager() {
0292     return operationManager;
0293   }
0294 
0295   private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
0296     @Override
0297     protected synchronized String initialValue() {
0298       return null;
0299     }
0300   };
0301 
0302   public static void setIpAddress(String ipAddress) {
0303     threadLocalIpAddress.set(ipAddress);
0304   }
0305 
0306   public static void clearIpAddress() {
0307     threadLocalIpAddress.remove();
0308   }
0309 
0310   public static String getIpAddress() {
0311     return threadLocalIpAddress.get();
0312   }
0313 
0314   private static ThreadLocal<String> threadLocalUserName = new ThreadLocal<String>(){
0315     @Override
0316     protected synchronized String initialValue() {
0317       return null;
0318     }
0319   };
0320 
0321   public static void setUserName(String userName) {
0322     threadLocalUserName.set(userName);
0323   }
0324 
0325   public static void clearUserName() {
0326     threadLocalUserName.remove();
0327   }
0328 
0329   public static String getUserName() {
0330     return threadLocalUserName.get();
0331   }
0332 
0333   private static ThreadLocal<String> threadLocalProxyUserName = new ThreadLocal<String>(){
0334     @Override
0335     protected synchronized String initialValue() {
0336       return null;
0337     }
0338   };
0339 
0340   public static void setProxyUserName(String userName) {
0341     LOG.debug("setting proxy user name based on query param to: " + userName);
0342     threadLocalProxyUserName.set(userName);
0343   }
0344 
0345   public static String getProxyUserName() {
0346     return threadLocalProxyUserName.get();
0347   }
0348 
0349   public static void clearProxyUserName() {
0350     threadLocalProxyUserName.remove();
0351   }
0352 
0353   public Future<?> submitBackgroundOperation(Runnable r) {
0354     return backgroundOperationPool.submit(r);
0355   }
0356 
0357   public int getOpenSessionCount() {
0358     return handleToSession.size();
0359   }
0360 }
0361