Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli.thrift;
0020 
0021 import java.util.ArrayList;
0022 import java.util.List;
0023 import java.util.concurrent.ExecutorService;
0024 import java.util.concurrent.SynchronousQueue;
0025 import java.util.concurrent.ThreadPoolExecutor;
0026 import java.util.concurrent.TimeUnit;
0027 
0028 import org.apache.hadoop.hive.conf.HiveConf;
0029 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0030 import org.apache.hadoop.hive.shims.ShimLoader;
0031 import org.apache.hive.service.auth.HiveAuthFactory;
0032 import org.apache.hive.service.cli.CLIService;
0033 import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
0034 import org.apache.thrift.TProcessorFactory;
0035 import org.apache.thrift.protocol.TBinaryProtocol;
0036 import org.apache.thrift.server.TThreadPoolServer;
0037 import org.apache.thrift.transport.TServerSocket;
0038 import org.apache.thrift.transport.TTransportFactory;
0039 
0040 
0041 public class ThriftBinaryCLIService extends ThriftCLIService {
0042 
0043   public ThriftBinaryCLIService(CLIService cliService) {
0044     super(cliService, ThriftBinaryCLIService.class.getSimpleName());
0045   }
0046 
0047   @Override
0048   public void run() {
0049     try {
0050       // Server thread pool
0051       String threadPoolName = "HiveServer2-Handler-Pool";
0052       ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
0053           workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
0054           new ThreadFactoryWithGarbageCleanup(threadPoolName));
0055 
0056       // Thrift configs
0057       hiveAuthFactory = new HiveAuthFactory(hiveConf);
0058       TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
0059       TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
0060       TServerSocket serverSocket = null;
0061       List<String> sslVersionBlacklist = new ArrayList<String>();
0062       for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
0063         sslVersionBlacklist.add(sslVersion);
0064       }
0065       if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
0066         serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum);
0067       } else {
0068         String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
0069         if (keyStorePath.isEmpty()) {
0070           throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
0071               + " Not configured for SSL connection");
0072         }
0073         String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
0074             HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
0075         serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath,
0076             keyStorePassword, sslVersionBlacklist);
0077       }
0078 
0079       // In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which
0080       // represents any free port, we should set it to the actual one
0081       portNum = serverSocket.getServerSocket().getLocalPort();
0082 
0083       // Server args
0084       int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
0085       int requestTimeout = (int) hiveConf.getTimeVar(
0086           HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
0087       int beBackoffSlotLength = (int) hiveConf.getTimeVar(
0088           HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
0089       TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
0090           .processorFactory(processorFactory).transportFactory(transportFactory)
0091           .protocolFactory(new TBinaryProtocol.Factory())
0092           .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
0093           .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
0094           .beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
0095           .executorService(executorService);
0096 
0097       // TCP Server
0098       server = new TThreadPoolServer(sargs);
0099       server.setServerEventHandler(serverEventHandler);
0100       String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
0101           + serverSocket.getServerSocket().getLocalPort() + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
0102       LOG.info(msg);
0103       server.serve();
0104     } catch (Throwable t) {
0105       LOG.fatal(
0106           "Error starting HiveServer2: could not start "
0107               + ThriftBinaryCLIService.class.getSimpleName(), t);
0108       System.exit(-1);
0109     }
0110   }
0111 
0112 }