0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.hive.service.cli.thrift;
0020
0021 import java.util.ArrayList;
0022 import java.util.List;
0023 import java.util.concurrent.ExecutorService;
0024 import java.util.concurrent.SynchronousQueue;
0025 import java.util.concurrent.ThreadPoolExecutor;
0026 import java.util.concurrent.TimeUnit;
0027
0028 import org.apache.hadoop.hive.conf.HiveConf;
0029 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0030 import org.apache.hadoop.hive.shims.ShimLoader;
0031 import org.apache.hive.service.auth.HiveAuthFactory;
0032 import org.apache.hive.service.cli.CLIService;
0033 import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
0034 import org.apache.thrift.TProcessorFactory;
0035 import org.apache.thrift.protocol.TBinaryProtocol;
0036 import org.apache.thrift.server.TThreadPoolServer;
0037 import org.apache.thrift.transport.TServerSocket;
0038 import org.apache.thrift.transport.TTransportFactory;
0039
0040
0041 public class ThriftBinaryCLIService extends ThriftCLIService {
0042
0043 public ThriftBinaryCLIService(CLIService cliService) {
0044 super(cliService, ThriftBinaryCLIService.class.getSimpleName());
0045 }
0046
0047 @Override
0048 public void run() {
0049 try {
0050
0051 String threadPoolName = "HiveServer2-Handler-Pool";
0052 ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
0053 workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
0054 new ThreadFactoryWithGarbageCleanup(threadPoolName));
0055
0056
0057 hiveAuthFactory = new HiveAuthFactory(hiveConf);
0058 TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
0059 TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
0060 TServerSocket serverSocket = null;
0061 List<String> sslVersionBlacklist = new ArrayList<String>();
0062 for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
0063 sslVersionBlacklist.add(sslVersion);
0064 }
0065 if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
0066 serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum);
0067 } else {
0068 String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
0069 if (keyStorePath.isEmpty()) {
0070 throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
0071 + " Not configured for SSL connection");
0072 }
0073 String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
0074 HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
0075 serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath,
0076 keyStorePassword, sslVersionBlacklist);
0077 }
0078
0079
0080
0081 portNum = serverSocket.getServerSocket().getLocalPort();
0082
0083
0084 int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
0085 int requestTimeout = (int) hiveConf.getTimeVar(
0086 HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
0087 int beBackoffSlotLength = (int) hiveConf.getTimeVar(
0088 HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
0089 TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
0090 .processorFactory(processorFactory).transportFactory(transportFactory)
0091 .protocolFactory(new TBinaryProtocol.Factory())
0092 .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
0093 .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
0094 .beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
0095 .executorService(executorService);
0096
0097
0098 server = new TThreadPoolServer(sargs);
0099 server.setServerEventHandler(serverEventHandler);
0100 String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
0101 + serverSocket.getServerSocket().getLocalPort() + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
0102 LOG.info(msg);
0103 server.serve();
0104 } catch (Throwable t) {
0105 LOG.fatal(
0106 "Error starting HiveServer2: could not start "
0107 + ThriftBinaryCLIService.class.getSimpleName(), t);
0108 System.exit(-1);
0109 }
0110 }
0111
0112 }