0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.hive.service.cli.session;
0020
0021 import java.io.File;
0022 import java.io.IOException;
0023 import java.util.ArrayList;
0024 import java.util.Date;
0025 import java.util.Map;
0026 import java.util.concurrent.ConcurrentHashMap;
0027 import java.util.concurrent.Future;
0028 import java.util.concurrent.LinkedBlockingQueue;
0029 import java.util.concurrent.ThreadPoolExecutor;
0030 import java.util.concurrent.TimeUnit;
0031
0032 import org.apache.commons.io.FileUtils;
0033 import org.apache.commons.logging.Log;
0034 import org.apache.commons.logging.LogFactory;
0035 import org.apache.hadoop.hive.conf.HiveConf;
0036 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0037 import org.apache.hive.service.CompositeService;
0038 import org.apache.hive.service.cli.HiveSQLException;
0039 import org.apache.hive.service.cli.SessionHandle;
0040 import org.apache.hive.service.cli.operation.OperationManager;
0041 import org.apache.hive.service.cli.thrift.TProtocolVersion;
0042 import org.apache.hive.service.server.HiveServer2;
0043 import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
0044
0045
0046
0047
0048
0049 public class SessionManager extends CompositeService {
0050
0051 private static final Log LOG = LogFactory.getLog(SessionManager.class);
0052 public static final String HIVERCFILE = ".hiverc";
0053 private HiveConf hiveConf;
0054 private final Map<SessionHandle, HiveSession> handleToSession =
0055 new ConcurrentHashMap<SessionHandle, HiveSession>();
0056 private final OperationManager operationManager = new OperationManager();
0057 private ThreadPoolExecutor backgroundOperationPool;
0058 private boolean isOperationLogEnabled;
0059 private File operationLogRootDir;
0060
0061 private long checkInterval;
0062 private long sessionTimeout;
0063 private boolean checkOperation;
0064
0065 private volatile boolean shutdown;
0066
0067 private final HiveServer2 hiveServer2;
0068
0069 public SessionManager(HiveServer2 hiveServer2) {
0070 super(SessionManager.class.getSimpleName());
0071 this.hiveServer2 = hiveServer2;
0072 }
0073
0074 @Override
0075 public synchronized void init(HiveConf hiveConf) {
0076 this.hiveConf = hiveConf;
0077
0078 if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
0079 initOperationLogRootDir();
0080 }
0081 createBackgroundOperationPool();
0082 addService(operationManager);
0083 super.init(hiveConf);
0084 }
0085
0086 private void createBackgroundOperationPool() {
0087 int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
0088 LOG.info("HiveServer2: Background operation thread pool size: " + poolSize);
0089 int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
0090 LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize);
0091 long keepAliveTime = HiveConf.getTimeVar(
0092 hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS);
0093 LOG.info(
0094 "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds");
0095
0096
0097
0098
0099 String threadPoolName = "HiveServer2-Background-Pool";
0100 backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize,
0101 keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize),
0102 new ThreadFactoryWithGarbageCleanup(threadPoolName));
0103 backgroundOperationPool.allowCoreThreadTimeOut(true);
0104
0105 checkInterval = HiveConf.getTimeVar(
0106 hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
0107 sessionTimeout = HiveConf.getTimeVar(
0108 hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
0109 checkOperation = HiveConf.getBoolVar(hiveConf,
0110 ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION);
0111 }
0112
0113 private void initOperationLogRootDir() {
0114 operationLogRootDir = new File(
0115 hiveConf.getVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION));
0116 isOperationLogEnabled = true;
0117
0118 if (operationLogRootDir.exists() && !operationLogRootDir.isDirectory()) {
0119 LOG.warn("The operation log root directory exists, but it is not a directory: " +
0120 operationLogRootDir.getAbsolutePath());
0121 isOperationLogEnabled = false;
0122 }
0123
0124 if (!operationLogRootDir.exists()) {
0125 if (!operationLogRootDir.mkdirs()) {
0126 LOG.warn("Unable to create operation log root directory: " +
0127 operationLogRootDir.getAbsolutePath());
0128 isOperationLogEnabled = false;
0129 }
0130 }
0131
0132 if (isOperationLogEnabled) {
0133 LOG.info("Operation log root directory is created: " + operationLogRootDir.getAbsolutePath());
0134 try {
0135 FileUtils.forceDeleteOnExit(operationLogRootDir);
0136 } catch (IOException e) {
0137 LOG.warn("Failed to schedule cleanup HS2 operation logging root dir: " +
0138 operationLogRootDir.getAbsolutePath(), e);
0139 }
0140 }
0141 }
0142
0143 @Override
0144 public synchronized void start() {
0145 super.start();
0146 if (checkInterval > 0) {
0147 startTimeoutChecker();
0148 }
0149 }
0150
0151 private void startTimeoutChecker() {
0152 final long interval = Math.max(checkInterval, 3000L);
0153 Runnable timeoutChecker = new Runnable() {
0154 @Override
0155 public void run() {
0156 for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
0157 long current = System.currentTimeMillis();
0158 for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
0159 if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current
0160 && (!checkOperation || session.getNoOperationTime() > sessionTimeout)) {
0161 SessionHandle handle = session.getSessionHandle();
0162 LOG.warn("Session " + handle + " is Timed-out (last access : " +
0163 new Date(session.getLastAccessTime()) + ") and will be closed");
0164 try {
0165 closeSession(handle);
0166 } catch (HiveSQLException e) {
0167 LOG.warn("Exception is thrown closing session " + handle, e);
0168 }
0169 } else {
0170 session.closeExpiredOperations();
0171 }
0172 }
0173 }
0174 }
0175
0176 private void sleepInterval(long interval) {
0177 try {
0178 Thread.sleep(interval);
0179 } catch (InterruptedException e) {
0180
0181 }
0182 }
0183 };
0184 backgroundOperationPool.execute(timeoutChecker);
0185 }
0186
0187 @Override
0188 public synchronized void stop() {
0189 super.stop();
0190 shutdown = true;
0191 if (backgroundOperationPool != null) {
0192 backgroundOperationPool.shutdown();
0193 long timeout = hiveConf.getTimeVar(
0194 ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
0195 try {
0196 backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
0197 } catch (InterruptedException e) {
0198 LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
0199 " seconds has been exceeded. RUNNING background operations will be shut down", e);
0200 }
0201 backgroundOperationPool = null;
0202 }
0203 cleanupLoggingRootDir();
0204 }
0205
0206 private void cleanupLoggingRootDir() {
0207 if (isOperationLogEnabled) {
0208 try {
0209 FileUtils.forceDelete(operationLogRootDir);
0210 } catch (Exception e) {
0211 LOG.warn("Failed to cleanup root dir of HS2 logging: " + operationLogRootDir
0212 .getAbsolutePath(), e);
0213 }
0214 }
0215 }
0216
0217 public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,
0218 Map<String, String> sessionConf) throws HiveSQLException {
0219 return openSession(protocol, username, password, ipAddress, sessionConf, false, null);
0220 }
0221
0222
0223
0224
0225
0226
0227
0228
0229
0230
0231
0232
0233
0234
0235
0236
0237
0238
0239
0240
0241 public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,
0242 Map<String, String> sessionConf, boolean withImpersonation, String delegationToken)
0243 throws HiveSQLException {
0244 HiveSession session;
0245
0246
0247 if (withImpersonation) {
0248 HiveSessionImplwithUGI sessionWithUGI = new HiveSessionImplwithUGI(protocol, username, password,
0249 hiveConf, ipAddress, delegationToken);
0250 session = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi());
0251 sessionWithUGI.setProxySession(session);
0252 } else {
0253 session = new HiveSessionImpl(protocol, username, password, hiveConf, ipAddress);
0254 }
0255 session.setSessionManager(this);
0256 session.setOperationManager(operationManager);
0257 try {
0258 session.open(sessionConf);
0259 } catch (Exception e) {
0260 try {
0261 session.close();
0262 } catch (Throwable t) {
0263 LOG.warn("Error closing session", t);
0264 }
0265 session = null;
0266 throw new HiveSQLException("Failed to open new session: " + e, e);
0267 }
0268 if (isOperationLogEnabled) {
0269 session.setOperationLogSessionDir(operationLogRootDir);
0270 }
0271 handleToSession.put(session.getSessionHandle(), session);
0272 return session.getSessionHandle();
0273 }
0274
0275 public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
0276 HiveSession session = handleToSession.remove(sessionHandle);
0277 if (session == null) {
0278 throw new HiveSQLException("Session does not exist!");
0279 }
0280 session.close();
0281 }
0282
0283 public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException {
0284 HiveSession session = handleToSession.get(sessionHandle);
0285 if (session == null) {
0286 throw new HiveSQLException("Invalid SessionHandle: " + sessionHandle);
0287 }
0288 return session;
0289 }
0290
0291 public OperationManager getOperationManager() {
0292 return operationManager;
0293 }
0294
0295 private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
0296 @Override
0297 protected synchronized String initialValue() {
0298 return null;
0299 }
0300 };
0301
0302 public static void setIpAddress(String ipAddress) {
0303 threadLocalIpAddress.set(ipAddress);
0304 }
0305
0306 public static void clearIpAddress() {
0307 threadLocalIpAddress.remove();
0308 }
0309
0310 public static String getIpAddress() {
0311 return threadLocalIpAddress.get();
0312 }
0313
0314 private static ThreadLocal<String> threadLocalUserName = new ThreadLocal<String>(){
0315 @Override
0316 protected synchronized String initialValue() {
0317 return null;
0318 }
0319 };
0320
0321 public static void setUserName(String userName) {
0322 threadLocalUserName.set(userName);
0323 }
0324
0325 public static void clearUserName() {
0326 threadLocalUserName.remove();
0327 }
0328
0329 public static String getUserName() {
0330 return threadLocalUserName.get();
0331 }
0332
0333 private static ThreadLocal<String> threadLocalProxyUserName = new ThreadLocal<String>(){
0334 @Override
0335 protected synchronized String initialValue() {
0336 return null;
0337 }
0338 };
0339
0340 public static void setProxyUserName(String userName) {
0341 LOG.debug("setting proxy user name based on query param to: " + userName);
0342 threadLocalProxyUserName.set(userName);
0343 }
0344
0345 public static String getProxyUserName() {
0346 return threadLocalProxyUserName.get();
0347 }
0348
0349 public static void clearProxyUserName() {
0350 threadLocalProxyUserName.remove();
0351 }
0352
0353 public Future<?> submitBackgroundOperation(Runnable r) {
0354 return backgroundOperationPool.submit(r);
0355 }
0356
0357 public int getOpenSessionCount() {
0358 return handleToSession.size();
0359 }
0360 }
0361