0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.hive.service.cli.thrift;
0020
0021 import java.util.ArrayList;
0022 import java.util.List;
0023 import java.util.concurrent.ExecutorService;
0024 import java.util.concurrent.SynchronousQueue;
0025 import java.util.concurrent.ThreadPoolExecutor;
0026 import java.util.concurrent.TimeUnit;
0027
0028 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
0029 import org.apache.hadoop.hive.conf.HiveConf;
0030 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0031 import org.apache.hadoop.hive.shims.ShimLoader;
0032 import org.apache.hive.service.auth.HiveAuthFactory;
0033 import org.apache.hive.service.cli.CLIService;
0034 import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
0035 import org.apache.thrift.TProcessorFactory;
0036 import org.apache.thrift.protocol.TBinaryProtocol;
0037 import org.apache.thrift.server.TThreadPoolServer;
0038 import org.apache.thrift.transport.TServerSocket;
0039 import org.apache.thrift.transport.TTransportFactory;
0040
0041
0042 public class ThriftBinaryCLIService extends ThriftCLIService {
0043
0044 public ThriftBinaryCLIService(CLIService cliService) {
0045 super(cliService, ThriftBinaryCLIService.class.getSimpleName());
0046 }
0047
0048 @Override
0049 public void run() {
0050 try {
0051
0052 String threadPoolName = "HiveServer2-Handler-Pool";
0053 ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
0054 workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
0055 new ThreadFactoryWithGarbageCleanup(threadPoolName));
0056
0057
0058 hiveAuthFactory = new HiveAuthFactory(hiveConf);
0059 TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
0060 TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
0061 TServerSocket serverSocket = null;
0062 List<String> sslVersionBlacklist = new ArrayList<String>();
0063 for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
0064 sslVersionBlacklist.add(sslVersion);
0065 }
0066 if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
0067 serverSocket = HiveAuthUtils.getServerSocket(hiveHost, portNum);
0068 } else {
0069 String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
0070 if (keyStorePath.isEmpty()) {
0071 throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
0072 + " Not configured for SSL connection");
0073 }
0074 String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
0075 HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
0076 serverSocket = HiveAuthUtils.getServerSSLSocket(hiveHost, portNum, keyStorePath,
0077 keyStorePassword, sslVersionBlacklist);
0078 }
0079
0080
0081
0082 portNum = serverSocket.getServerSocket().getLocalPort();
0083
0084
0085 int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
0086 int requestTimeout = (int) hiveConf.getTimeVar(
0087 HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
0088 int beBackoffSlotLength = (int) hiveConf.getTimeVar(
0089 HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
0090 TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
0091 .processorFactory(processorFactory).transportFactory(transportFactory)
0092 .protocolFactory(new TBinaryProtocol.Factory())
0093 .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
0094 .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
0095 .beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
0096 .executorService(executorService);
0097
0098
0099 server = new TThreadPoolServer(sargs);
0100 server.setServerEventHandler(serverEventHandler);
0101 String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
0102 + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
0103 LOG.info(msg);
0104 server.serve();
0105 } catch (Throwable t) {
0106 LOG.error(
0107 "Error starting HiveServer2: could not start "
0108 + ThriftBinaryCLIService.class.getSimpleName(), t);
0109 System.exit(-1);
0110 }
0111 }
0112
0113 }