Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli.thrift;
0020 
0021 import javax.security.auth.login.LoginException;
0022 import java.io.IOException;
0023 import java.net.InetAddress;
0024 import java.net.UnknownHostException;
0025 import java.util.HashMap;
0026 import java.util.Map;
0027 import java.util.concurrent.TimeUnit;
0028 
0029 import org.apache.commons.logging.Log;
0030 import org.apache.commons.logging.LogFactory;
0031 import org.apache.hadoop.hive.conf.HiveConf;
0032 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
0033 import org.apache.hive.service.AbstractService;
0034 import org.apache.hive.service.ServiceException;
0035 import org.apache.hive.service.ServiceUtils;
0036 import org.apache.hive.service.auth.HiveAuthFactory;
0037 import org.apache.hive.service.auth.TSetIpAddressProcessor;
0038 import org.apache.hive.service.cli.*;
0039 import org.apache.hive.service.cli.session.SessionManager;
0040 import org.apache.hive.service.server.HiveServer2;
0041 import org.apache.thrift.TException;
0042 import org.apache.thrift.protocol.TProtocol;
0043 import org.apache.thrift.server.ServerContext;
0044 import org.apache.thrift.server.TServer;
0045 import org.apache.thrift.server.TServerEventHandler;
0046 import org.apache.thrift.transport.TTransport;
0047 
0048 /**
0049  * ThriftCLIService.
0050  *
0051  */
0052 public abstract class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable {
0053 
0054   public static final Log LOG = LogFactory.getLog(ThriftCLIService.class.getName());
0055 
0056   protected CLIService cliService;
0057   private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
0058   protected static HiveAuthFactory hiveAuthFactory;
0059 
0060   protected int portNum;
0061   protected InetAddress serverIPAddress;
0062   protected String hiveHost;
0063   protected TServer server;
0064   protected org.eclipse.jetty.server.Server httpServer;
0065 
0066   private boolean isStarted = false;
0067   protected boolean isEmbedded = false;
0068 
0069   protected HiveConf hiveConf;
0070 
0071   protected int minWorkerThreads;
0072   protected int maxWorkerThreads;
0073   protected long workerKeepAliveTime;
0074 
0075   protected TServerEventHandler serverEventHandler;
0076   protected ThreadLocal<ServerContext> currentServerContext;
0077 
0078   static class ThriftCLIServerContext implements ServerContext {
0079     private SessionHandle sessionHandle = null;
0080 
0081     public void setSessionHandle(SessionHandle sessionHandle) {
0082       this.sessionHandle = sessionHandle;
0083     }
0084 
0085     public SessionHandle getSessionHandle() {
0086       return sessionHandle;
0087     }
0088   }
0089 
0090   public ThriftCLIService(CLIService service, String serviceName) {
0091     super(serviceName);
0092     this.cliService = service;
0093     currentServerContext = new ThreadLocal<ServerContext>();
0094     serverEventHandler = new TServerEventHandler() {
0095       @Override
0096       public ServerContext createContext(
0097           TProtocol input, TProtocol output) {
0098         return new ThriftCLIServerContext();
0099       }
0100 
0101       @Override
0102       public void deleteContext(ServerContext serverContext,
0103           TProtocol input, TProtocol output) {
0104         ThriftCLIServerContext context = (ThriftCLIServerContext)serverContext;
0105         SessionHandle sessionHandle = context.getSessionHandle();
0106         if (sessionHandle != null) {
0107           LOG.info("Session disconnected without closing properly, close it now");
0108           try {
0109             cliService.closeSession(sessionHandle);
0110           } catch (HiveSQLException e) {
0111             LOG.warn("Failed to close session: " + e, e);
0112           }
0113         }
0114       }
0115 
0116       @Override
0117       public void preServe() {
0118       }
0119 
0120       @Override
0121       public void processContext(ServerContext serverContext,
0122           TTransport input, TTransport output) {
0123         currentServerContext.set(serverContext);
0124       }
0125     };
0126   }
0127 
0128   @Override
0129   public synchronized void init(HiveConf hiveConf) {
0130     this.hiveConf = hiveConf;
0131     // Initialize common server configs needed in both binary & http modes
0132     String portString;
0133     hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
0134     if (hiveHost == null) {
0135       hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
0136     }
0137     try {
0138       if (hiveHost != null && !hiveHost.isEmpty()) {
0139         serverIPAddress = InetAddress.getByName(hiveHost);
0140       } else {
0141         serverIPAddress = InetAddress.getLocalHost();
0142       }
0143     } catch (UnknownHostException e) {
0144       throw new ServiceException(e);
0145     }
0146     // HTTP mode
0147     if (HiveServer2.isHTTPTransportMode(hiveConf)) {
0148       workerKeepAliveTime =
0149           hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME,
0150               TimeUnit.SECONDS);
0151       portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
0152       if (portString != null) {
0153         portNum = Integer.valueOf(portString);
0154       } else {
0155         portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
0156       }
0157     }
0158     // Binary mode
0159     else {
0160       workerKeepAliveTime =
0161           hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
0162       portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
0163       if (portString != null) {
0164         portNum = Integer.valueOf(portString);
0165       } else {
0166         portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
0167       }
0168     }
0169     minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
0170     maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
0171     super.init(hiveConf);
0172   }
0173 
0174   @Override
0175   public synchronized void start() {
0176     super.start();
0177     if (!isStarted && !isEmbedded) {
0178       new Thread(this).start();
0179       isStarted = true;
0180     }
0181   }
0182 
0183   @Override
0184   public synchronized void stop() {
0185     if (isStarted && !isEmbedded) {
0186       if(server != null) {
0187         server.stop();
0188         LOG.info("Thrift server has stopped");
0189       }
0190       if((httpServer != null) && httpServer.isStarted()) {
0191         try {
0192           httpServer.stop();
0193           LOG.info("Http server has stopped");
0194         } catch (Exception e) {
0195           LOG.error("Error stopping Http server: ", e);
0196         }
0197       }
0198       isStarted = false;
0199     }
0200     super.stop();
0201   }
0202 
0203   public int getPortNumber() {
0204     return portNum;
0205   }
0206 
0207   public InetAddress getServerIPAddress() {
0208     return serverIPAddress;
0209   }
0210 
0211   @Override
0212   public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req)
0213       throws TException {
0214     TGetDelegationTokenResp resp = new TGetDelegationTokenResp();
0215     resp.setStatus(notSupportTokenErrorStatus());
0216     return resp;
0217   }
0218 
0219   @Override
0220   public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req)
0221       throws TException {
0222     TCancelDelegationTokenResp resp = new TCancelDelegationTokenResp();
0223     resp.setStatus(notSupportTokenErrorStatus());
0224     return resp;
0225   }
0226 
0227   @Override
0228   public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req)
0229       throws TException {
0230     TRenewDelegationTokenResp resp = new TRenewDelegationTokenResp();
0231     resp.setStatus(notSupportTokenErrorStatus());
0232     return resp;
0233   }
0234 
0235   private TStatus notSupportTokenErrorStatus() {
0236     TStatus errorStatus = new TStatus(TStatusCode.ERROR_STATUS);
0237     errorStatus.setErrorMessage("Delegation token is not supported");
0238     return errorStatus;
0239   }
0240 
0241   @Override
0242   public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
0243     LOG.info("Client protocol version: " + req.getClient_protocol());
0244     TOpenSessionResp resp = new TOpenSessionResp();
0245     try {
0246       SessionHandle sessionHandle = getSessionHandle(req, resp);
0247       resp.setSessionHandle(sessionHandle.toTSessionHandle());
0248       // TODO: set real configuration map
0249       resp.setConfiguration(new HashMap<String, String>());
0250       resp.setStatus(OK_STATUS);
0251       ThriftCLIServerContext context =
0252         (ThriftCLIServerContext)currentServerContext.get();
0253       if (context != null) {
0254         context.setSessionHandle(sessionHandle);
0255       }
0256     } catch (Exception e) {
0257       LOG.warn("Error opening session: ", e);
0258       resp.setStatus(HiveSQLException.toTStatus(e));
0259     }
0260     return resp;
0261   }
0262 
0263   private String getIpAddress() {
0264     String clientIpAddress;
0265     // Http transport mode.
0266     // We set the thread local ip address, in ThriftHttpServlet.
0267     if (cliService.getHiveConf().getVar(
0268         ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
0269       clientIpAddress = SessionManager.getIpAddress();
0270     }
0271     else {
0272       // Kerberos
0273       if (isKerberosAuthMode()) {
0274         clientIpAddress = hiveAuthFactory.getIpAddress();
0275       }
0276       // Except kerberos, NOSASL
0277       else {
0278         clientIpAddress = TSetIpAddressProcessor.getUserIpAddress();
0279       }
0280     }
0281     LOG.debug("Client's IP Address: " + clientIpAddress);
0282     return clientIpAddress;
0283   }
0284 
0285   /**
0286    * Returns the effective username.
0287    * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user
0288    * 2. If hive.server2.allow.user.substitution = true: the username of the end user,
0289    * that the connecting user is trying to proxy for.
0290    * This includes a check whether the connecting user is allowed to proxy for the end user.
0291    * @param req
0292    * @return
0293    * @throws HiveSQLException
0294    */
0295   private String getUserName(TOpenSessionReq req) throws HiveSQLException {
0296     String userName = null;
0297     // Kerberos
0298     if (isKerberosAuthMode()) {
0299       userName = hiveAuthFactory.getRemoteUser();
0300     }
0301     // Except kerberos, NOSASL
0302     if (userName == null) {
0303       userName = TSetIpAddressProcessor.getUserName();
0304     }
0305     // Http transport mode.
0306     // We set the thread local username, in ThriftHttpServlet.
0307     if (cliService.getHiveConf().getVar(
0308         ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
0309       userName = SessionManager.getUserName();
0310     }
0311     if (userName == null) {
0312       userName = req.getUsername();
0313     }
0314 
0315     userName = getShortName(userName);
0316     String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress());
0317     LOG.debug("Client's username: " + effectiveClientUser);
0318     return effectiveClientUser;
0319   }
0320 
0321   private String getShortName(String userName) {
0322     String ret = null;
0323     if (userName != null) {
0324       int indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName);
0325       ret = (indexOfDomainMatch <= 0) ? userName :
0326           userName.substring(0, indexOfDomainMatch);
0327     }
0328 
0329     return ret;
0330   }
0331 
0332   /**
0333    * Create a session handle
0334    * @param req
0335    * @param res
0336    * @return
0337    * @throws HiveSQLException
0338    * @throws LoginException
0339    * @throws IOException
0340    */
0341   SessionHandle getSessionHandle(TOpenSessionReq req, TOpenSessionResp res)
0342       throws HiveSQLException, LoginException, IOException {
0343     String userName = getUserName(req);
0344     String ipAddress = getIpAddress();
0345     TProtocolVersion protocol = getMinVersion(CLIService.SERVER_VERSION,
0346         req.getClient_protocol());
0347     res.setServerProtocolVersion(protocol);
0348     SessionHandle sessionHandle;
0349     if (cliService.getHiveConf().getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
0350         (userName != null)) {
0351       String delegationTokenStr = getDelegationToken(userName);
0352       sessionHandle = cliService.openSessionWithImpersonation(protocol, userName,
0353           req.getPassword(), ipAddress, req.getConfiguration(), delegationTokenStr);
0354     } else {
0355       sessionHandle = cliService.openSession(protocol, userName, req.getPassword(),
0356           ipAddress, req.getConfiguration());
0357     }
0358     return sessionHandle;
0359   }
0360 
0361 
0362   private String getDelegationToken(String userName)
0363       throws HiveSQLException, LoginException, IOException {
0364     if (userName == null || !cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
0365         .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString())) {
0366       return null;
0367     }
0368     try {
0369       return cliService.getDelegationTokenFromMetaStore(userName);
0370     } catch (UnsupportedOperationException e) {
0371       // The delegation token is not applicable in the given deployment mode
0372     }
0373     return null;
0374   }
0375 
0376   private TProtocolVersion getMinVersion(TProtocolVersion... versions) {
0377     TProtocolVersion[] values = TProtocolVersion.values();
0378     int current = values[values.length - 1].getValue();
0379     for (TProtocolVersion version : versions) {
0380       if (current > version.getValue()) {
0381         current = version.getValue();
0382       }
0383     }
0384     for (TProtocolVersion version : values) {
0385       if (version.getValue() == current) {
0386         return version;
0387       }
0388     }
0389     throw new IllegalArgumentException("never");
0390   }
0391 
0392   @Override
0393   public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
0394     TCloseSessionResp resp = new TCloseSessionResp();
0395     try {
0396       SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
0397       cliService.closeSession(sessionHandle);
0398       resp.setStatus(OK_STATUS);
0399       ThriftCLIServerContext context =
0400         (ThriftCLIServerContext)currentServerContext.get();
0401       if (context != null) {
0402         context.setSessionHandle(null);
0403       }
0404     } catch (Exception e) {
0405       LOG.warn("Error closing session: ", e);
0406       resp.setStatus(HiveSQLException.toTStatus(e));
0407     }
0408     return resp;
0409   }
0410 
0411   @Override
0412   public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
0413     TGetInfoResp resp = new TGetInfoResp();
0414     try {
0415       GetInfoValue getInfoValue =
0416           cliService.getInfo(new SessionHandle(req.getSessionHandle()),
0417               GetInfoType.getGetInfoType(req.getInfoType()));
0418       resp.setInfoValue(getInfoValue.toTGetInfoValue());
0419       resp.setStatus(OK_STATUS);
0420     } catch (Exception e) {
0421       LOG.warn("Error getting info: ", e);
0422       resp.setStatus(HiveSQLException.toTStatus(e));
0423     }
0424     return resp;
0425   }
0426 
0427   @Override
0428   public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
0429     TExecuteStatementResp resp = new TExecuteStatementResp();
0430     try {
0431       SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
0432       String statement = req.getStatement();
0433       Map<String, String> confOverlay = req.getConfOverlay();
0434       Boolean runAsync = req.isRunAsync();
0435       OperationHandle operationHandle = runAsync ?
0436           cliService.executeStatementAsync(sessionHandle, statement, confOverlay)
0437           : cliService.executeStatement(sessionHandle, statement, confOverlay);
0438           resp.setOperationHandle(operationHandle.toTOperationHandle());
0439           resp.setStatus(OK_STATUS);
0440     } catch (Exception e) {
0441       LOG.warn("Error executing statement: ", e);
0442       resp.setStatus(HiveSQLException.toTStatus(e));
0443     }
0444     return resp;
0445   }
0446 
0447   @Override
0448   public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
0449     TGetTypeInfoResp resp = new TGetTypeInfoResp();
0450     try {
0451       OperationHandle operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle()));
0452       resp.setOperationHandle(operationHandle.toTOperationHandle());
0453       resp.setStatus(OK_STATUS);
0454     } catch (Exception e) {
0455       LOG.warn("Error getting type info: ", e);
0456       resp.setStatus(HiveSQLException.toTStatus(e));
0457     }
0458     return resp;
0459   }
0460 
0461   @Override
0462   public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
0463     TGetCatalogsResp resp = new TGetCatalogsResp();
0464     try {
0465       OperationHandle opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle()));
0466       resp.setOperationHandle(opHandle.toTOperationHandle());
0467       resp.setStatus(OK_STATUS);
0468     } catch (Exception e) {
0469       LOG.warn("Error getting catalogs: ", e);
0470       resp.setStatus(HiveSQLException.toTStatus(e));
0471     }
0472     return resp;
0473   }
0474 
0475   @Override
0476   public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {
0477     TGetSchemasResp resp = new TGetSchemasResp();
0478     try {
0479       OperationHandle opHandle = cliService.getSchemas(
0480           new SessionHandle(req.getSessionHandle()), req.getCatalogName(), req.getSchemaName());
0481       resp.setOperationHandle(opHandle.toTOperationHandle());
0482       resp.setStatus(OK_STATUS);
0483     } catch (Exception e) {
0484       LOG.warn("Error getting schemas: ", e);
0485       resp.setStatus(HiveSQLException.toTStatus(e));
0486     }
0487     return resp;
0488   }
0489 
0490   @Override
0491   public TGetTablesResp GetTables(TGetTablesReq req) throws TException {
0492     TGetTablesResp resp = new TGetTablesResp();
0493     try {
0494       OperationHandle opHandle = cliService
0495           .getTables(new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
0496               req.getSchemaName(), req.getTableName(), req.getTableTypes());
0497       resp.setOperationHandle(opHandle.toTOperationHandle());
0498       resp.setStatus(OK_STATUS);
0499     } catch (Exception e) {
0500       LOG.warn("Error getting tables: ", e);
0501       resp.setStatus(HiveSQLException.toTStatus(e));
0502     }
0503     return resp;
0504   }
0505 
0506   @Override
0507   public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException {
0508     TGetTableTypesResp resp = new TGetTableTypesResp();
0509     try {
0510       OperationHandle opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle()));
0511       resp.setOperationHandle(opHandle.toTOperationHandle());
0512       resp.setStatus(OK_STATUS);
0513     } catch (Exception e) {
0514       LOG.warn("Error getting table types: ", e);
0515       resp.setStatus(HiveSQLException.toTStatus(e));
0516     }
0517     return resp;
0518   }
0519 
0520   @Override
0521   public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {
0522     TGetColumnsResp resp = new TGetColumnsResp();
0523     try {
0524       OperationHandle opHandle = cliService.getColumns(
0525           new SessionHandle(req.getSessionHandle()),
0526           req.getCatalogName(),
0527           req.getSchemaName(),
0528           req.getTableName(),
0529           req.getColumnName());
0530       resp.setOperationHandle(opHandle.toTOperationHandle());
0531       resp.setStatus(OK_STATUS);
0532     } catch (Exception e) {
0533       LOG.warn("Error getting columns: ", e);
0534       resp.setStatus(HiveSQLException.toTStatus(e));
0535     }
0536     return resp;
0537   }
0538 
0539   @Override
0540   public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException {
0541     TGetFunctionsResp resp = new TGetFunctionsResp();
0542     try {
0543       OperationHandle opHandle = cliService.getFunctions(
0544           new SessionHandle(req.getSessionHandle()), req.getCatalogName(),
0545           req.getSchemaName(), req.getFunctionName());
0546       resp.setOperationHandle(opHandle.toTOperationHandle());
0547       resp.setStatus(OK_STATUS);
0548     } catch (Exception e) {
0549       LOG.warn("Error getting functions: ", e);
0550       resp.setStatus(HiveSQLException.toTStatus(e));
0551     }
0552     return resp;
0553   }
0554 
0555   @Override
0556   public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {
0557     TGetOperationStatusResp resp = new TGetOperationStatusResp();
0558     try {
0559       OperationStatus operationStatus = cliService.getOperationStatus(
0560           new OperationHandle(req.getOperationHandle()));
0561       resp.setOperationState(operationStatus.getState().toTOperationState());
0562       HiveSQLException opException = operationStatus.getOperationException();
0563       if (opException != null) {
0564         resp.setSqlState(opException.getSQLState());
0565         resp.setErrorCode(opException.getErrorCode());
0566         resp.setErrorMessage(opException.getMessage());
0567       }
0568       resp.setStatus(OK_STATUS);
0569     } catch (Exception e) {
0570       LOG.warn("Error getting operation status: ", e);
0571       resp.setStatus(HiveSQLException.toTStatus(e));
0572     }
0573     return resp;
0574   }
0575 
0576   @Override
0577   public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException {
0578     TCancelOperationResp resp = new TCancelOperationResp();
0579     try {
0580       cliService.cancelOperation(new OperationHandle(req.getOperationHandle()));
0581       resp.setStatus(OK_STATUS);
0582     } catch (Exception e) {
0583       LOG.warn("Error cancelling operation: ", e);
0584       resp.setStatus(HiveSQLException.toTStatus(e));
0585     }
0586     return resp;
0587   }
0588 
0589   @Override
0590   public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException {
0591     TCloseOperationResp resp = new TCloseOperationResp();
0592     try {
0593       cliService.closeOperation(new OperationHandle(req.getOperationHandle()));
0594       resp.setStatus(OK_STATUS);
0595     } catch (Exception e) {
0596       LOG.warn("Error closing operation: ", e);
0597       resp.setStatus(HiveSQLException.toTStatus(e));
0598     }
0599     return resp;
0600   }
0601 
0602   @Override
0603   public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req)
0604       throws TException {
0605     TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
0606     try {
0607       TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle()));
0608       resp.setSchema(schema.toTTableSchema());
0609       resp.setStatus(OK_STATUS);
0610     } catch (Exception e) {
0611       LOG.warn("Error getting result set metadata: ", e);
0612       resp.setStatus(HiveSQLException.toTStatus(e));
0613     }
0614     return resp;
0615   }
0616 
0617   @Override
0618   public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
0619     TFetchResultsResp resp = new TFetchResultsResp();
0620     try {
0621       RowSet rowSet = cliService.fetchResults(
0622           new OperationHandle(req.getOperationHandle()),
0623           FetchOrientation.getFetchOrientation(req.getOrientation()),
0624           req.getMaxRows(),
0625           FetchType.getFetchType(req.getFetchType()));
0626       resp.setResults(rowSet.toTRowSet());
0627       resp.setHasMoreRows(false);
0628       resp.setStatus(OK_STATUS);
0629     } catch (Exception e) {
0630       LOG.warn("Error fetching results: ", e);
0631       resp.setStatus(HiveSQLException.toTStatus(e));
0632     }
0633     return resp;
0634   }
0635 
0636   @Override
0637   public abstract void run();
0638 
0639   /**
0640    * If the proxy user name is provided then check privileges to substitute the user.
0641    * @param realUser
0642    * @param sessionConf
0643    * @param ipAddress
0644    * @return
0645    * @throws HiveSQLException
0646    */
0647   private String getProxyUser(String realUser, Map<String, String> sessionConf,
0648       String ipAddress) throws HiveSQLException {
0649     String proxyUser = null;
0650     // Http transport mode.
0651     // We set the thread local proxy username, in ThriftHttpServlet.
0652     if (cliService.getHiveConf().getVar(
0653         ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) {
0654       proxyUser = SessionManager.getProxyUserName();
0655       LOG.debug("Proxy user from query string: " + proxyUser);
0656     }
0657 
0658     if (proxyUser == null && sessionConf != null && sessionConf.containsKey(HiveAuthFactory.HS2_PROXY_USER)) {
0659       String proxyUserFromThriftBody = sessionConf.get(HiveAuthFactory.HS2_PROXY_USER);
0660       LOG.debug("Proxy user from thrift body: " + proxyUserFromThriftBody);
0661       proxyUser = proxyUserFromThriftBody;
0662     }
0663 
0664     if (proxyUser == null) {
0665       return realUser;
0666     }
0667 
0668     // check whether substitution is allowed
0669     if (!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ALLOW_USER_SUBSTITUTION)) {
0670       throw new HiveSQLException("Proxy user substitution is not allowed");
0671     }
0672 
0673     // If there's no authentication, then directly substitute the user
0674     if (HiveAuthFactory.AuthTypes.NONE.toString()
0675         .equalsIgnoreCase(hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION))) {
0676       return proxyUser;
0677     }
0678 
0679     // Verify proxy user privilege of the realUser for the proxyUser
0680     HiveAuthFactory.verifyProxyAccess(realUser, proxyUser, ipAddress, hiveConf);
0681     LOG.debug("Verified proxy user: " + proxyUser);
0682     return proxyUser;
0683   }
0684 
0685   private boolean isKerberosAuthMode() {
0686     return cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
0687         .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
0688   }
0689 }