Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli.thrift;
0020 
0021 import javax.security.auth.login.LoginException;
0022 import java.io.IOException;
0023 import java.net.InetAddress;
0024 import java.net.UnknownHostException;
0025 import java.util.HashMap;
0026 import java.util.Map;
0027 import java.util.concurrent.TimeUnit;
0028 
0029 import org.apache.hadoop.hive.conf.HiveConf;
0030 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0031 import org.apache.hive.service.AbstractService;
0032 import org.apache.hive.service.ServiceException;
0033 import org.apache.hive.service.ServiceUtils;
0034 import org.apache.hive.service.auth.HiveAuthFactory;
0035 import org.apache.hive.service.auth.TSetIpAddressProcessor;
0036 import org.apache.hive.service.cli.*;
0037 import org.apache.hive.service.cli.session.SessionManager;
0038 import org.apache.hive.service.rpc.thrift.*;
0039 import org.apache.hive.service.server.HiveServer2;
0040 import org.apache.thrift.TException;
0041 import org.apache.thrift.protocol.TProtocol;
0042 import org.apache.thrift.server.ServerContext;
0043 import org.apache.thrift.server.TServer;
0044 import org.apache.thrift.server.TServerEventHandler;
0045 import org.apache.thrift.transport.TTransport;
0046 import org.slf4j.Logger;
0047 import org.slf4j.LoggerFactory;
0048 
0049 /**
0050  * ThriftCLIService.
0051  *
0052  */
0053 public abstract class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable {
0054 
0055   public static final Logger LOG = LoggerFactory.getLogger(ThriftCLIService.class.getName());
0056 
0057   protected CLIService cliService;
0058   private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
0059   protected static HiveAuthFactory hiveAuthFactory;
0060 
0061   protected int portNum;
0062   protected InetAddress serverIPAddress;
0063   protected String hiveHost;
0064   protected TServer server;
0065   protected org.eclipse.jetty.server.Server httpServer;
0066 
0067   private boolean isStarted = false;
0068   protected boolean isEmbedded = false;
0069 
0070   protected HiveConf hiveConf;
0071 
0072   protected int minWorkerThreads;
0073   protected int maxWorkerThreads;
0074   protected long workerKeepAliveTime;
0075 
0076   protected TServerEventHandler serverEventHandler;
0077   protected ThreadLocal<ServerContext> currentServerContext;
0078 
0079   static class ThriftCLIServerContext implements ServerContext {
0080     private SessionHandle sessionHandle = null;
0081 
0082     public void setSessionHandle(SessionHandle sessionHandle) {
0083       this.sessionHandle = sessionHandle;
0084     }
0085 
0086     public SessionHandle getSessionHandle() {
0087       return sessionHandle;
0088     }
0089   }
0090 
0091   public ThriftCLIService(CLIService service, String serviceName) {
0092     super(serviceName);
0093     this.cliService = service;
0094     currentServerContext = new ThreadLocal<ServerContext>();
0095     serverEventHandler = new TServerEventHandler() {
0096       @Override
0097       public ServerContext createContext(
0098           TProtocol input, TProtocol output) {
0099         return new ThriftCLIServerContext();
0100       }
0101 
0102       @Override
0103       public void deleteContext(ServerContext serverContext,
0104           TProtocol input, TProtocol output) {
0105         ThriftCLIServerContext context = (ThriftCLIServerContext)serverContext;
0106         SessionHandle sessionHandle = context.getSessionHandle();
0107         if (sessionHandle != null) {
0108           LOG.info("Session disconnected without closing properly, close it now");
0109           try {
0110             cliService.closeSession(sessionHandle);
0111           } catch (HiveSQLException e) {
0112             LOG.warn("Failed to close session: " + e, e);
0113           }
0114         }
0115       }
0116 
0117       @Override
0118       public void preServe() {
0119       }
0120 
0121       @Override
0122       public void processContext(ServerContext serverContext,
0123           TTransport input, TTransport output) {
0124         currentServerContext.set(serverContext);
0125       }
0126     };
0127   }
0128 
0129   @Override
0130   public synchronized void init(HiveConf hiveConf) {
0131     this.hiveConf = hiveConf;
0132     // Initialize common server configs needed in both binary & http modes
0133     String portString;
0134     hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
0135     if (hiveHost == null) {
0136       hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
0137     }
0138     try {
0139       if (hiveHost != null && !hiveHost.isEmpty()) {
0140         serverIPAddress = InetAddress.getByName(hiveHost);
0141       } else {
0142         serverIPAddress = InetAddress.getLocalHost();
0143       }
0144     } catch (UnknownHostException e) {
0145       throw new ServiceException(e);
0146     }
0147     // HTTP mode
0148     if (HiveServer2.isHTTPTransportMode(hiveConf)) {
0149       workerKeepAliveTime =
0150           hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME,
0151               TimeUnit.SECONDS);
0152       portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
0153       if (portString != null) {
0154         portNum = Integer.valueOf(portString);
0155       } else {
0156         portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
0157       }
0158     }
0159     // Binary mode
0160     else {
0161       workerKeepAliveTime =
0162           hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
0163       portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
0164       if (portString != null) {
0165         portNum = Integer.valueOf(portString);
0166       } else {
0167         portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
0168       }
0169     }
0170     minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
0171     maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
0172     super.init(hiveConf);
0173   }
0174 
0175   @Override
0176   public synchronized void start() {
0177     super.start();
0178     if (!isStarted && !isEmbedded) {
0179       new Thread(this).start();
0180       isStarted = true;
0181     }
0182   }
0183 
0184   @Override
0185   public synchronized void stop() {
0186     if (isStarted && !isEmbedded) {
0187       if(server != null) {
0188         server.stop();
0189         LOG.info("Thrift server has stopped");
0190       }
0191       if((httpServer != null) && httpServer.isStarted()) {
0192         try {
0193           httpServer.stop();
0194           LOG.info("Http server has stopped");
0195         } catch (Exception e) {
0196           LOG.error("Error stopping Http server: ", e);
0197         }
0198       }
0199       isStarted = false;
0200     }
0201     super.stop();
0202   }
0203 
0204   public int getPortNumber() {
0205     return portNum;
0206   }
0207 
0208   public InetAddress getServerIPAddress() {
0209     return serverIPAddress;
0210   }
0211 
0212   @Override
0213   public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req)
0214       throws TException {
0215     TGetDelegationTokenResp resp = new TGetDelegationTokenResp();
0216     resp.setStatus(notSupportTokenErrorStatus());
0217     return resp;
0218   }
0219 
0220   @Override
0221   public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req)
0222       throws TException {
0223     TCancelDelegationTokenResp resp = new TCancelDelegationTokenResp();
0224     resp.setStatus(notSupportTokenErrorStatus());
0225     return resp;
0226   }
0227 
0228   @Override
0229   public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req)
0230       throws TException {
0231     TRenewDelegationTokenResp resp = new TRenewDelegationTokenResp();
0232     resp.setStatus(notSupportTokenErrorStatus());
0233     return resp;
0234   }
0235 
0236   private TStatus notSupportTokenErrorStatus() {
0237     TStatus errorStatus = new TStatus(TStatusCode.ERROR_STATUS);
0238     errorStatus.setErrorMessage("Delegation token is not supported");
0239     return errorStatus;
0240   }
0241 
0242   @Override
0243   public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
0244     LOG.info("Client protocol version: " + req.getClient_protocol());
0245     TOpenSessionResp resp = new TOpenSessionResp();
0246     try {
0247       SessionHandle sessionHandle = getSessionHandle(req, resp);
0248       resp.setSessionHandle(sessionHandle.toTSessionHandle());
0249       // TODO: set real configuration map
0250       resp.setConfiguration(new HashMap<String, String>());
0251       resp.setStatus(OK_STATUS);
0252       ThriftCLIServerContext context =
0253         (ThriftCLIServerContext)currentServerContext.get();
0254       if (context != null) {
0255         context.setSessionHandle(sessionHandle);
0256       }
0257     } catch (Exception e) {
0258       LOG.warn("Error opening session: ", e);
0259       resp.setStatus(HiveSQLException.toTStatus(e));
0260     }
0261     return resp;
0262   }
0263 
0264   private String getIpAddress() {
0265     String clientIpAddress;
0266     // Http transport mode.
0267     // We set the thread local ip address, in ThriftHttpServlet.
0268     if (cliService.getHiveConf().getVar(
0269         ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
0270       clientIpAddress = SessionManager.getIpAddress();
0271     }
0272     else {
0273       // Kerberos
0274       if (isKerberosAuthMode()) {
0275         clientIpAddress = hiveAuthFactory.getIpAddress();
0276       }
0277       // Except kerberos, NOSASL
0278       else {
0279         clientIpAddress = TSetIpAddressProcessor.getUserIpAddress();
0280       }
0281     }
0282     LOG.debug("Client's IP Address: " + clientIpAddress);
0283     return clientIpAddress;
0284   }
0285 
0286   /**
0287    * Returns the effective username.
0288    * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user
0289    * 2. If hive.server2.allow.user.substitution = true: the username of the end user,
0290    * that the connecting user is trying to proxy for.
0291    * This includes a check whether the connecting user is allowed to proxy for the end user.
0292    * @param req
0293    * @return
0294    * @throws HiveSQLException
0295    */
0296   private String getUserName(TOpenSessionReq req) throws HiveSQLException {
0297     String userName = null;
0298     // Kerberos
0299     if (isKerberosAuthMode()) {
0300       userName = hiveAuthFactory.getRemoteUser();
0301     }
0302     // Except kerberos, NOSASL
0303     if (userName == null) {
0304       userName = TSetIpAddressProcessor.getUserName();
0305     }
0306     // Http transport mode.
0307     // We set the thread local username, in ThriftHttpServlet.
0308     if (cliService.getHiveConf().getVar(
0309         ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
0310       userName = SessionManager.getUserName();
0311     }
0312     if (userName == null) {
0313       userName = req.getUsername();
0314     }
0315 
0316     userName = getShortName(userName);
0317     String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress());
0318     LOG.debug("Client's username: " + effectiveClientUser);
0319     return effectiveClientUser;
0320   }
0321 
0322   private String getShortName(String userName) {
0323     String ret = null;
0324     if (userName != null) {
0325       int indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName);
0326       ret = (indexOfDomainMatch <= 0) ? userName :
0327           userName.substring(0, indexOfDomainMatch);
0328     }
0329 
0330     return ret;
0331   }
0332 
0333   /**
0334    * Create a session handle
0335    * @param req
0336    * @param res
0337    * @return
0338    * @throws HiveSQLException
0339    * @throws LoginException
0340    * @throws IOException
0341    */
0342   SessionHandle getSessionHandle(TOpenSessionReq req, TOpenSessionResp res)
0343       throws HiveSQLException, LoginException, IOException {
0344     String userName = getUserName(req);
0345     String ipAddress = getIpAddress();
0346     TProtocolVersion protocol = getMinVersion(CLIService.SERVER_VERSION,
0347         req.getClient_protocol());
0348     res.setServerProtocolVersion(protocol);
0349     SessionHandle sessionHandle;
0350     if (cliService.getHiveConf().getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
0351         (userName != null)) {
0352       String delegationTokenStr = getDelegationToken(userName);
0353       sessionHandle = cliService.openSessionWithImpersonation(protocol, userName,
0354           req.getPassword(), ipAddress, req.getConfiguration(), delegationTokenStr);
0355     } else {
0356       sessionHandle = cliService.openSession(protocol, userName, req.getPassword(),
0357           ipAddress, req.getConfiguration());
0358     }
0359     return sessionHandle;
0360   }
0361 
0362 
0363   private String getDelegationToken(String userName)
0364       throws HiveSQLException, LoginException, IOException {
0365     if (userName == null || !cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
0366         .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString())) {
0367       return null;
0368     }
0369     try {
0370       return cliService.getDelegationTokenFromMetaStore(userName);
0371     } catch (UnsupportedOperationException e) {
0372       // The delegation token is not applicable in the given deployment mode
0373     }
0374     return null;
0375   }
0376 
0377   private TProtocolVersion getMinVersion(TProtocolVersion... versions) {
0378     TProtocolVersion[] values = TProtocolVersion.values();
0379     int current = values[values.length - 1].getValue();
0380     for (TProtocolVersion version : versions) {
0381       if (current > version.getValue()) {
0382         current = version.getValue();
0383       }
0384     }
0385     for (TProtocolVersion version : values) {
0386       if (version.getValue() == current) {
0387         return version;
0388       }
0389     }
0390     throw new IllegalArgumentException("never");
0391   }
0392 
0393   @Override
0394   public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
0395     TCloseSessionResp resp = new TCloseSessionResp();
0396     try {
0397       SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
0398       cliService.closeSession(sessionHandle);
0399       resp.setStatus(OK_STATUS);
0400       ThriftCLIServerContext context =
0401         (ThriftCLIServerContext)currentServerContext.get();
0402       if (context != null) {
0403         context.setSessionHandle(null);
0404       }
0405     } catch (Exception e) {
0406       LOG.warn("Error closing session: ", e);
0407       resp.setStatus(HiveSQLException.toTStatus(e));
0408     }
0409     return resp;
0410   }
0411 
0412   @Override
0413   public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
0414     TGetInfoResp resp = new TGetInfoResp();
0415     try {
0416       GetInfoValue getInfoValue =
0417           cliService.getInfo(new SessionHandle(req.getSessionHandle()),
0418               GetInfoType.getGetInfoType(req.getInfoType()));
0419       resp.setInfoValue(getInfoValue.toTGetInfoValue());
0420       resp.setStatus(OK_STATUS);
0421     } catch (Exception e) {
0422       LOG.warn("Error getting info: ", e);
0423       resp.setStatus(HiveSQLException.toTStatus(e));
0424     }
0425     return resp;
0426   }
0427 
0428   @Override
0429   public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
0430     TExecuteStatementResp resp = new TExecuteStatementResp();
0431     try {
0432       SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
0433       String statement = req.getStatement();
0434       Map<String, String> confOverlay = req.getConfOverlay();
0435       Boolean runAsync = req.isRunAsync();
0436       long queryTimeout = req.getQueryTimeout();
0437       OperationHandle operationHandle = runAsync ?
0438           cliService.executeStatementAsync(sessionHandle, statement, confOverlay, queryTimeout)
0439           : cliService.executeStatement(sessionHandle, statement, confOverlay, queryTimeout);
0440           resp.setOperationHandle(operationHandle.toTOperationHandle());
0441           resp.setStatus(OK_STATUS);
0442     } catch (Exception e) {
0443       LOG.warn("Error executing statement: ", e);
0444       resp.setStatus(HiveSQLException.toTStatus(e));
0445     }
0446     return resp;
0447   }
0448 
0449   @Override
0450   public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
0451     TGetTypeInfoResp resp = new TGetTypeInfoResp();
0452     try {
0453       OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle()));
0454       resp.setOperationHandle(operationHandle.toTOperationHandle());
0455       resp.setStatus(OK_STATUS);
0456     } catch (Exception e) {
0457       LOG.warn("Error getting type info: ", e);
0458       resp.setStatus(HiveSQLException.toTStatus(e));
0459     }
0460     return resp;
0461   }
0462 
0463   @Override
0464   public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
0465     TGetCatalogsResp resp = new TGetCatalogsResp();
0466     try {
0467       OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle()));
0468       resp.setOperationHandle(opHandle.toTOperationHandle());
0469       resp.setStatus(OK_STATUS);
0470     } catch (Exception e) {
0471       LOG.warn("Error getting catalogs: ", e);
0472       resp.setStatus(HiveSQLException.toTStatus(e));
0473     }
0474     return resp;
0475   }
0476 
0477   @Override
0478   public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {
0479     TGetSchemasResp resp = new TGetSchemasResp();
0480     try {
0481       OperationHandle opHandle = cliService.getSchemas(
0482           new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName());
0483       resp.setOperationHandle(opHandle.toTOperationHandle());
0484       resp.setStatus(OK_STATUS);
0485     } catch (Exception e) {
0486       LOG.warn("Error getting schemas: ", e);
0487       resp.setStatus(HiveSQLException.toTStatus(e));
0488     }
0489     return resp;
0490   }
0491 
0492   @Override
0493   public TGetTablesResp GetTables(TGetTablesReq req) throws TException {
0494     TGetTablesResp resp = new TGetTablesResp();
0495     try {
0496       OperationHandle opHandle = cliService
0497           .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
0498               req.getSchemaName(), req.getTableName(), req.getTableTypes());
0499       resp.setOperationHandle(opHandle.toTOperationHandle());
0500       resp.setStatus(OK_STATUS);
0501     } catch (Exception e) {
0502       LOG.warn("Error getting tables: ", e);
0503       resp.setStatus(HiveSQLException.toTStatus(e));
0504     }
0505     return resp;
0506   }
0507 
0508   @Override
0509   public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException {
0510     TGetTableTypesResp resp = new TGetTableTypesResp();
0511     try {
0512       OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle()));
0513       resp.setOperationHandle(opHandle.toTOperationHandle());
0514       resp.setStatus(OK_STATUS);
0515     } catch (Exception e) {
0516       LOG.warn("Error getting table types: ", e);
0517       resp.setStatus(HiveSQLException.toTStatus(e));
0518     }
0519     return resp;
0520   }
0521 
0522   @Override
0523   public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {
0524     TGetColumnsResp resp = new TGetColumnsResp();
0525     try {
0526       OperationHandle opHandle = cliService.getColumns(
0527           new SessionHandle(req.getSessionHandle()),
0528           req.getCatalogName(),
0529           req.getSchemaName(),
0530           req.getTableName(),
0531           req.getColumnName());
0532       resp.setOperationHandle(opHandle.toTOperationHandle());
0533       resp.setStatus(OK_STATUS);
0534     } catch (Exception e) {
0535       LOG.warn("Error getting columns: ", e);
0536       resp.setStatus(HiveSQLException.toTStatus(e));
0537     }
0538     return resp;
0539   }
0540 
0541   @Override
0542   public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException {
0543     TGetFunctionsResp resp = new TGetFunctionsResp();
0544     try {
0545       OperationHandle opHandle = cliService.getFunctions(
0546           new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
0547           req.getSchemaName(), req.getFunctionName());
0548       resp.setOperationHandle(opHandle.toTOperationHandle());
0549       resp.setStatus(OK_STATUS);
0550     } catch (Exception e) {
0551       LOG.warn("Error getting functions: ", e);
0552       resp.setStatus(HiveSQLException.toTStatus(e));
0553     }
0554     return resp;
0555   }
0556 
0557   @Override
0558   public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {
0559     TGetOperationStatusResp resp = new TGetOperationStatusResp();
0560     try {
0561       OperationStatus operationStatus = cliService.getOperationStatus(
0562           new OperationHandle(req.getOperationHandle()));
0563       resp.setOperationState(operationStatus.getState().toTOperationState());
0564       HiveSQLException opException = operationStatus.getOperationException();
0565       if (opException != null) {
0566         resp.setSqlState(opException.getSQLState());
0567         resp.setErrorCode(opException.getErrorCode());
0568         resp.setErrorMessage(opException.getMessage());
0569       }
0570       resp.setStatus(OK_STATUS);
0571     } catch (Exception e) {
0572       LOG.warn("Error getting operation status: ", e);
0573       resp.setStatus(HiveSQLException.toTStatus(e));
0574     }
0575     return resp;
0576   }
0577 
0578   @Override
0579   public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException {
0580     TCancelOperationResp resp = new TCancelOperationResp();
0581     try {
0582       cliService.cancelOperation(new OperationHandle(req.getOperationHandle()));
0583       resp.setStatus(OK_STATUS);
0584     } catch (Exception e) {
0585       LOG.warn("Error cancelling operation: ", e);
0586       resp.setStatus(HiveSQLException.toTStatus(e));
0587     }
0588     return resp;
0589   }
0590 
0591   @Override
0592   public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException {
0593     TCloseOperationResp resp = new TCloseOperationResp();
0594     try {
0595       cliService.closeOperation(new OperationHandle(req.getOperationHandle()));
0596       resp.setStatus(OK_STATUS);
0597     } catch (Exception e) {
0598       LOG.warn("Error closing operation: ", e);
0599       resp.setStatus(HiveSQLException.toTStatus(e));
0600     }
0601     return resp;
0602   }
0603 
0604   @Override
0605   public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req)
0606       throws TException {
0607     TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
0608     try {
0609       TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle()));
0610       resp.setSchema(schema.toTTableSchema());
0611       resp.setStatus(OK_STATUS);
0612     } catch (Exception e) {
0613       LOG.warn("Error getting result set metadata: ", e);
0614       resp.setStatus(HiveSQLException.toTStatus(e));
0615     }
0616     return resp;
0617   }
0618 
0619   @Override
0620   public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
0621     TFetchResultsResp resp = new TFetchResultsResp();
0622     try {
0623       RowSet rowSet = cliService.fetchResults(
0624           new OperationHandle(req.getOperationHandle()),
0625           FetchOrientation.getFetchOrientation(req.getOrientation()),
0626           req.getMaxRows(),
0627           FetchType.getFetchType(req.getFetchType()));
0628       resp.setResults(rowSet.toTRowSet());
0629       resp.setHasMoreRows(false);
0630       resp.setStatus(OK_STATUS);
0631     } catch (Exception e) {
0632       LOG.warn("Error fetching results: ", e);
0633       resp.setStatus(HiveSQLException.toTStatus(e));
0634     }
0635     return resp;
0636   }
0637 
0638   @Override
0639   public TGetPrimaryKeysResp GetPrimaryKeys(TGetPrimaryKeysReq req)
0640       throws TException {
0641     TGetPrimaryKeysResp resp = new TGetPrimaryKeysResp();
0642     try {
0643       OperationHandle opHandle = cliService.getPrimaryKeys(
0644       new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
0645       req.getSchemaName(), req.getTableName());
0646       resp.setOperationHandle(opHandle.toTOperationHandle());
0647       resp.setStatus(OK_STATUS);
0648     } catch (Exception e) {
0649      LOG.warn("Error getting functions: ", e);
0650      resp.setStatus(HiveSQLException.toTStatus(e));
0651     }
0652     return resp;
0653   }
0654 
0655   @Override
0656   public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req)
0657       throws TException {
0658     TGetCrossReferenceResp resp = new TGetCrossReferenceResp();
0659     try {
0660       OperationHandle opHandle = cliService.getCrossReference(
0661         new SessionHandle(req.getSessionHandle()), req.getParentCatalogName(),
0662           req.getParentSchemaName(), req.getParentTableName(),
0663           req.getForeignCatalogName(), req.getForeignSchemaName(), req.getForeignTableName());
0664           resp.setOperationHandle(opHandle.toTOperationHandle());
0665           resp.setStatus(OK_STATUS);
0666     } catch (Exception e) {
0667       LOG.warn("Error getting functions: ", e);
0668       resp.setStatus(HiveSQLException.toTStatus(e));
0669     }
0670     return resp;
0671   }
0672 
0673   @Override
0674   public abstract void run();
0675 
0676   /**
0677    * If the proxy user name is provided then check privileges to substitute the user.
0678    * @param realUser
0679    * @param sessionConf
0680    * @param ipAddress
0681    * @return
0682    * @throws HiveSQLException
0683    */
0684   private String getProxyUser(String realUser, Map<String, String> sessionConf,
0685       String ipAddress) throws HiveSQLException {
0686     String proxyUser = null;
0687     // Http transport mode.
0688     // We set the thread local proxy username, in ThriftHttpServlet.
0689     if (cliService.getHiveConf().getVar(
0690         ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
0691       proxyUser = SessionManager.getProxyUserName();
0692       LOG.debug("Proxy user from query string: " + proxyUser);
0693     }
0694 
0695     if (proxyUser == null && sessionConf != null && sessionConf.containsKey(HiveAuthFactory.HS2_PROXY_USER)) {
0696       String proxyUserFromThriftBody = sessionConf.get(HiveAuthFactory.HS2_PROXY_USER);
0697       LOG.debug("Proxy user from thrift body: " + proxyUserFromThriftBody);
0698       proxyUser = proxyUserFromThriftBody;
0699     }
0700 
0701     if (proxyUser == null) {
0702       return realUser;
0703     }
0704 
0705     // check whether substitution is allowed
0706     if (!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ALLOW_USER_SUBSTITUTION)) {
0707       throw new HiveSQLException("Proxy user substitution is not allowed");
0708     }
0709 
0710     // If there's no authentication, then directly substitute the user
0711     if (HiveAuthFactory.AuthTypes.NONE.toString()
0712         .equalsIgnoreCase(hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION))) {
0713       return proxyUser;
0714     }
0715 
0716     // Verify proxy user privilege of the realUser for the proxyUser
0717     HiveAuthFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, hiveConf);
0718     LOG.debug("Verified proxy user: " + proxyUser);
0719     return proxyUser;
0720   }
0721 
0722   private boolean isKerberosAuthMode() {
0723     return cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
0724         .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
0725   }
0726 }