Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli.operation;
0020 
0021 import java.io.IOException;
0022 import java.io.Serializable;
0023 import java.nio.charset.StandardCharsets;
0024 import java.security.PrivilegedExceptionAction;
0025 import java.sql.SQLException;
0026 import java.util.ArrayList;
0027 import java.util.List;
0028 import java.util.Map;
0029 import java.util.Properties;
0030 import java.util.concurrent.Future;
0031 import java.util.concurrent.RejectedExecutionException;
0032 
0033 import org.apache.commons.codec.binary.Base64;
0034 import org.apache.hadoop.hive.conf.HiveConf;
0035 import org.apache.hadoop.hive.metastore.api.FieldSchema;
0036 import org.apache.hadoop.hive.metastore.api.Schema;
0037 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
0038 import org.apache.hadoop.hive.ql.Driver;
0039 import org.apache.hadoop.hive.ql.exec.ExplainTask;
0040 import org.apache.hadoop.hive.ql.exec.Task;
0041 import org.apache.hadoop.hive.ql.metadata.Hive;
0042 import org.apache.hadoop.hive.ql.metadata.HiveException;
0043 import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
0044 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
0045 import org.apache.hadoop.hive.ql.session.SessionState;
0046 import org.apache.hadoop.hive.serde.serdeConstants;
0047 import org.apache.hadoop.hive.serde2.SerDe;
0048 import org.apache.hadoop.hive.serde2.SerDeException;
0049 import org.apache.hadoop.hive.serde2.SerDeUtils;
0050 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
0051 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
0052 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
0053 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
0054 import org.apache.hadoop.hive.shims.Utils;
0055 import org.apache.hadoop.io.BytesWritable;
0056 import org.apache.hadoop.security.UserGroupInformation;
0057 import org.apache.hive.service.cli.FetchOrientation;
0058 import org.apache.hive.service.cli.HiveSQLException;
0059 import org.apache.hive.service.cli.OperationState;
0060 import org.apache.hive.service.cli.RowSet;
0061 import org.apache.hive.service.cli.RowSetFactory;
0062 import org.apache.hive.service.cli.TableSchema;
0063 import org.apache.hive.service.cli.session.HiveSession;
0064 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
0065 
0066 /**
0067  * SQLOperation.
0068  *
0069  */
0070 public class SQLOperation extends ExecuteStatementOperation {
0071 
0072   private Driver driver = null;
0073   private CommandProcessorResponse response;
0074   private TableSchema resultSchema = null;
0075   private Schema mResultSchema = null;
0076   private SerDe serde = null;
0077   private boolean fetchStarted = false;
0078 
0079   public SQLOperation(HiveSession parentSession, String statement, Map<String,
0080       String> confOverlay, boolean runInBackground) {
0081     // TODO: call setRemoteUser in ExecuteStatementOperation or higher.
0082     super(parentSession, statement, confOverlay, runInBackground);
0083   }
0084 
0085   /***
0086    * Compile the query and extract metadata
0087    * @param sqlOperationConf
0088    * @throws HiveSQLException
0089    */
0090   public void prepare(HiveConf sqlOperationConf) throws HiveSQLException {
0091     setState(OperationState.RUNNING);
0092 
0093     try {
0094       driver = new Driver(sqlOperationConf, getParentSession().getUserName());
0095 
0096       // set the operation handle information in Driver, so that thrift API users
0097       // can use the operation handle they receive, to lookup query information in
0098       // Yarn ATS
0099       String guid64 = Base64.encodeBase64URLSafeString(getHandle().getHandleIdentifier()
0100           .toTHandleIdentifier().getGuid()).trim();
0101       driver.setOperationId(guid64);
0102 
0103       // In Hive server mode, we are not able to retry in the FetchTask
0104       // case, when calling fetch queries since execute() has returned.
0105       // For now, we disable the test attempts.
0106       driver.setTryCount(Integer.MAX_VALUE);
0107 
0108       String subStatement = new VariableSubstitution().substitute(sqlOperationConf, statement);
0109       response = driver.compileAndRespond(subStatement);
0110       if (0 != response.getResponseCode()) {
0111         throw toSQLException("Error while compiling statement", response);
0112       }
0113 
0114       mResultSchema = driver.getSchema();
0115 
0116       // hasResultSet should be true only if the query has a FetchTask
0117       // "explain" is an exception for now
0118       if(driver.getPlan().getFetchTask() != null) {
0119         //Schema has to be set
0120         if (mResultSchema == null || !mResultSchema.isSetFieldSchemas()) {
0121           throw new HiveSQLException("Error compiling query: Schema and FieldSchema " +
0122               "should be set when query plan has a FetchTask");
0123         }
0124         resultSchema = new TableSchema(mResultSchema);
0125         setHasResultSet(true);
0126       } else {
0127         setHasResultSet(false);
0128       }
0129       // Set hasResultSet true if the plan has ExplainTask
0130       // TODO explain should use a FetchTask for reading
0131       for (Task<? extends Serializable> task: driver.getPlan().getRootTasks()) {
0132         if (task.getClass() == ExplainTask.class) {
0133           resultSchema = new TableSchema(mResultSchema);
0134           setHasResultSet(true);
0135           break;
0136         }
0137       }
0138     } catch (HiveSQLException e) {
0139       setState(OperationState.ERROR);
0140       throw e;
0141     } catch (Exception e) {
0142       setState(OperationState.ERROR);
0143       throw new HiveSQLException("Error running query: " + e.toString(), e);
0144     }
0145   }
0146 
0147   private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException {
0148     try {
0149       // In Hive server mode, we are not able to retry in the FetchTask
0150       // case, when calling fetch queries since execute() has returned.
0151       // For now, we disable the test attempts.
0152       driver.setTryCount(Integer.MAX_VALUE);
0153       response = driver.run();
0154       if (0 != response.getResponseCode()) {
0155         throw toSQLException("Error while processing statement", response);
0156       }
0157     } catch (HiveSQLException e) {
0158       // If the operation was cancelled by another thread,
0159       // Driver#run will return a non-zero response code.
0160       // We will simply return if the operation state is CANCELED,
0161       // otherwise throw an exception
0162       if (getStatus().getState() == OperationState.CANCELED) {
0163         return;
0164       }
0165       else {
0166         setState(OperationState.ERROR);
0167         throw e;
0168       }
0169     } catch (Exception e) {
0170       setState(OperationState.ERROR);
0171       throw new HiveSQLException("Error running query: " + e.toString(), e);
0172     }
0173     setState(OperationState.FINISHED);
0174   }
0175 
0176   @Override
0177   public void runInternal() throws HiveSQLException {
0178     setState(OperationState.PENDING);
0179     final HiveConf opConfig = getConfigForOperation();
0180     prepare(opConfig);
0181     if (!shouldRunAsync()) {
0182       runQuery(opConfig);
0183     } else {
0184       // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
0185       final SessionState parentSessionState = SessionState.get();
0186       // ThreadLocal Hive object needs to be set in background thread.
0187       // The metastore client in Hive is associated with right user.
0188       final Hive parentHive = getSessionHive();
0189       // Current UGI will get used by metastore when metsatore is in embedded mode
0190       // So this needs to get passed to the new background thread
0191       final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
0192       // Runnable impl to call runInternal asynchronously,
0193       // from a different thread
0194       Runnable backgroundOperation = new Runnable() {
0195         @Override
0196         public void run() {
0197           PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
0198             @Override
0199             public Object run() throws HiveSQLException {
0200               Hive.set(parentHive);
0201               SessionState.setCurrentSessionState(parentSessionState);
0202               // Set current OperationLog in this async thread for keeping on saving query log.
0203               registerCurrentOperationLog();
0204               try {
0205                 runQuery(opConfig);
0206               } catch (HiveSQLException e) {
0207                 setOperationException(e);
0208                 LOG.error("Error running hive query: ", e);
0209               } finally {
0210                 unregisterOperationLog();
0211               }
0212               return null;
0213             }
0214           };
0215 
0216           try {
0217             currentUGI.doAs(doAsAction);
0218           } catch (Exception e) {
0219             setOperationException(new HiveSQLException(e));
0220             LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
0221           }
0222           finally {
0223             /**
0224              * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
0225              * when this thread is garbage collected later.
0226              * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
0227              */
0228             if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
0229               ThreadWithGarbageCleanup currentThread =
0230                   (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
0231               currentThread.cacheThreadLocalRawStore();
0232             }
0233           }
0234         }
0235       };
0236       try {
0237         // This submit blocks if no background threads are available to run this operation
0238         Future<?> backgroundHandle =
0239             getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
0240         setBackgroundHandle(backgroundHandle);
0241       } catch (RejectedExecutionException rejected) {
0242         setState(OperationState.ERROR);
0243         throw new HiveSQLException("The background threadpool cannot accept" +
0244             " new task for execution, please retry the operation", rejected);
0245       }
0246     }
0247   }
0248 
0249   /**
0250    * Returns the current UGI on the stack
0251    * @param opConfig
0252    * @return UserGroupInformation
0253    * @throws HiveSQLException
0254    */
0255   private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException {
0256     try {
0257       return Utils.getUGI();
0258     } catch (Exception e) {
0259       throw new HiveSQLException("Unable to get current user", e);
0260     }
0261   }
0262 
0263   /**
0264    * Returns the ThreadLocal Hive for the current thread
0265    * @return Hive
0266    * @throws HiveSQLException
0267    */
0268   private Hive getSessionHive() throws HiveSQLException {
0269     try {
0270       return Hive.get();
0271     } catch (HiveException e) {
0272       throw new HiveSQLException("Failed to get ThreadLocal Hive object", e);
0273     }
0274   }
0275 
0276   private void cleanup(OperationState state) throws HiveSQLException {
0277     setState(state);
0278     if (shouldRunAsync()) {
0279       Future<?> backgroundHandle = getBackgroundHandle();
0280       if (backgroundHandle != null) {
0281         backgroundHandle.cancel(true);
0282       }
0283     }
0284     if (driver != null) {
0285       driver.close();
0286       driver.destroy();
0287     }
0288     driver = null;
0289 
0290     SessionState ss = SessionState.get();
0291     if (ss.getTmpOutputFile() != null) {
0292       ss.getTmpOutputFile().delete();
0293     }
0294   }
0295 
0296   @Override
0297   public void cancel() throws HiveSQLException {
0298     cleanup(OperationState.CANCELED);
0299   }
0300 
0301   @Override
0302   public void close() throws HiveSQLException {
0303     cleanup(OperationState.CLOSED);
0304     cleanupOperationLog();
0305   }
0306 
0307   @Override
0308   public TableSchema getResultSetSchema() throws HiveSQLException {
0309     assertState(OperationState.FINISHED);
0310     if (resultSchema == null) {
0311       resultSchema = new TableSchema(driver.getSchema());
0312     }
0313     return resultSchema;
0314   }
0315 
0316   private final transient List<Object> convey = new ArrayList<Object>();
0317 
0318   @Override
0319   public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
0320     validateDefaultFetchOrientation(orientation);
0321     assertState(OperationState.FINISHED);
0322 
0323     RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion());
0324 
0325     try {
0326       /* if client is requesting fetch-from-start and its not the first time reading from this operation
0327        * then reset the fetch position to beginning
0328        */
0329       if (orientation.equals(FetchOrientation.FETCH_FIRST) && fetchStarted) {
0330         driver.resetFetch();
0331       }
0332       fetchStarted = true;
0333       driver.setMaxRows((int) maxRows);
0334       if (driver.getResults(convey)) {
0335         return decode(convey, rowSet);
0336       }
0337       return rowSet;
0338     } catch (IOException e) {
0339       throw new HiveSQLException(e);
0340     } catch (CommandNeedRetryException e) {
0341       throw new HiveSQLException(e);
0342     } catch (Exception e) {
0343       throw new HiveSQLException(e);
0344     } finally {
0345       convey.clear();
0346     }
0347   }
0348 
0349   private RowSet decode(List<Object> rows, RowSet rowSet) throws Exception {
0350     if (driver.isFetchingTable()) {
0351       return prepareFromRow(rows, rowSet);
0352     }
0353     return decodeFromString(rows, rowSet);
0354   }
0355 
0356   // already encoded to thrift-able object in ThriftFormatter
0357   private RowSet prepareFromRow(List<Object> rows, RowSet rowSet) throws Exception {
0358     for (Object row : rows) {
0359       rowSet.addRow((Object[]) row);
0360     }
0361     return rowSet;
0362   }
0363 
0364   private RowSet decodeFromString(List<Object> rows, RowSet rowSet)
0365       throws SQLException, SerDeException {
0366     getSerDe();
0367     StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector();
0368     List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
0369 
0370     Object[] deserializedFields = new Object[fieldRefs.size()];
0371     Object rowObj;
0372     ObjectInspector fieldOI;
0373 
0374     int protocol = getProtocolVersion().getValue();
0375     for (Object rowString : rows) {
0376       rowObj = serde.deserialize(new BytesWritable(((String)rowString).getBytes(StandardCharsets.UTF_8)));
0377       for (int i = 0; i < fieldRefs.size(); i++) {
0378         StructField fieldRef = fieldRefs.get(i);
0379         fieldOI = fieldRef.getFieldObjectInspector();
0380         Object fieldData = soi.getStructFieldData(rowObj, fieldRef);
0381         deserializedFields[i] = SerDeUtils.toThriftPayload(fieldData, fieldOI, protocol);
0382       }
0383       rowSet.addRow(deserializedFields);
0384     }
0385     return rowSet;
0386   }
0387 
0388   private SerDe getSerDe() throws SQLException {
0389     if (serde != null) {
0390       return serde;
0391     }
0392     try {
0393       List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas();
0394       StringBuilder namesSb = new StringBuilder();
0395       StringBuilder typesSb = new StringBuilder();
0396 
0397       if (fieldSchemas != null && !fieldSchemas.isEmpty()) {
0398         for (int pos = 0; pos < fieldSchemas.size(); pos++) {
0399           if (pos != 0) {
0400             namesSb.append(",");
0401             typesSb.append(",");
0402           }
0403           namesSb.append(fieldSchemas.get(pos).getName());
0404           typesSb.append(fieldSchemas.get(pos).getType());
0405         }
0406       }
0407       String names = namesSb.toString();
0408       String types = typesSb.toString();
0409 
0410       serde = new LazySimpleSerDe();
0411       Properties props = new Properties();
0412       if (names.length() > 0) {
0413         LOG.debug("Column names: " + names);
0414         props.setProperty(serdeConstants.LIST_COLUMNS, names);
0415       }
0416       if (types.length() > 0) {
0417         LOG.debug("Column types: " + types);
0418         props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types);
0419       }
0420       SerDeUtils.initializeSerDe(serde, new HiveConf(), props, null);
0421 
0422     } catch (Exception ex) {
0423       ex.printStackTrace();
0424       throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex);
0425     }
0426     return serde;
0427   }
0428 
0429   /**
0430    * If there are query specific settings to overlay, then create a copy of config
0431    * There are two cases we need to clone the session config that's being passed to hive driver
0432    * 1. Async query -
0433    *    If the client changes a config setting, that shouldn't reflect in the execution already underway
0434    * 2. confOverlay -
0435    *    The query specific settings should only be applied to the query config and not session
0436    * @return new configuration
0437    * @throws HiveSQLException
0438    */
0439   private HiveConf getConfigForOperation() throws HiveSQLException {
0440     HiveConf sqlOperationConf = getParentSession().getHiveConf();
0441     if (!getConfOverlay().isEmpty() || shouldRunAsync()) {
0442       // clone the parent session config for this query
0443       sqlOperationConf = new HiveConf(sqlOperationConf);
0444 
0445       // apply overlay query specific settings, if any
0446       for (Map.Entry<String, String> confEntry : getConfOverlay().entrySet()) {
0447         try {
0448           sqlOperationConf.verifyAndSet(confEntry.getKey(), confEntry.getValue());
0449         } catch (IllegalArgumentException e) {
0450           throw new HiveSQLException("Error applying statement specific settings", e);
0451         }
0452       }
0453     }
0454     return sqlOperationConf;
0455   }
0456 }