Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli.thrift;
0020 
0021 import java.util.ArrayList;
0022 import java.util.List;
0023 import java.util.concurrent.ExecutorService;
0024 import java.util.concurrent.SynchronousQueue;
0025 import java.util.concurrent.ThreadPoolExecutor;
0026 import java.util.concurrent.TimeUnit;
0027 
0028 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
0029 import org.apache.hadoop.hive.conf.HiveConf;
0030 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0031 import org.apache.hadoop.hive.shims.ShimLoader;
0032 import org.apache.hive.service.auth.HiveAuthFactory;
0033 import org.apache.hive.service.cli.CLIService;
0034 import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
0035 import org.apache.thrift.TProcessorFactory;
0036 import org.apache.thrift.protocol.TBinaryProtocol;
0037 import org.apache.thrift.server.TThreadPoolServer;
0038 import org.apache.thrift.transport.TServerSocket;
0039 import org.apache.thrift.transport.TTransportFactory;
0040 
0041 
0042 public class ThriftBinaryCLIService extends ThriftCLIService {
0043 
0044   public ThriftBinaryCLIService(CLIService cliService) {
0045     super(cliService, ThriftBinaryCLIService.class.getSimpleName());
0046   }
0047 
0048   @Override
0049   public void run() {
0050     try {
0051       // Server thread pool
0052       String threadPoolName = "HiveServer2-Handler-Pool";
0053       ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
0054           workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
0055           new ThreadFactoryWithGarbageCleanup(threadPoolName));
0056 
0057       // Thrift configs
0058       hiveAuthFactory = new HiveAuthFactory(hiveConf);
0059       TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
0060       TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
0061       TServerSocket serverSocket = null;
0062       List<String> sslVersionBlacklist = new ArrayList<String>();
0063       for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
0064         sslVersionBlacklist.add(sslVersion);
0065       }
0066       if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
0067         serverSocket = HiveAuthUtils.getServerSocket(hiveHost, portNum);
0068       } else {
0069         String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
0070         if (keyStorePath.isEmpty()) {
0071           throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
0072               + " Not configured for SSL connection");
0073         }
0074         String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
0075             HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
0076         serverSocket = HiveAuthUtils.getServerSSLSocket(hiveHost, portNum, keyStorePath,
0077             keyStorePassword, sslVersionBlacklist);
0078       }
0079 
0080       // In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which
0081       // represents any free port, we should set it to the actual one
0082       portNum = serverSocket.getServerSocket().getLocalPort();
0083 
0084       // Server args
0085       int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
0086       int requestTimeout = (int) hiveConf.getTimeVar(
0087           HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
0088       int beBackoffSlotLength = (int) hiveConf.getTimeVar(
0089           HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
0090       TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
0091           .processorFactory(processorFactory).transportFactory(transportFactory)
0092           .protocolFactory(new TBinaryProtocol.Factory())
0093           .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
0094           .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
0095           .beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
0096           .executorService(executorService);
0097 
0098       // TCP Server
0099       server = new TThreadPoolServer(sargs);
0100       server.setServerEventHandler(serverEventHandler);
0101       String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
0102           + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
0103       LOG.info(msg);
0104       server.serve();
0105     } catch (Throwable t) {
0106       LOG.error(
0107           "Error starting HiveServer2: could not start "
0108               + ThriftBinaryCLIService.class.getSimpleName(), t);
0109       System.exit(-1);
0110     }
0111   }
0112 
0113 }