0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.hive.service.cli.operation;
0020
0021 import java.io.IOException;
0022 import java.io.Serializable;
0023 import java.security.PrivilegedExceptionAction;
0024 import java.sql.SQLException;
0025 import java.util.ArrayList;
0026 import java.util.List;
0027 import java.util.Map;
0028 import java.util.Properties;
0029 import java.util.concurrent.Future;
0030 import java.util.concurrent.RejectedExecutionException;
0031
0032 import static java.nio.charset.StandardCharsets.UTF_8;
0033
0034 import org.apache.commons.codec.binary.Base64;
0035 import org.apache.hadoop.hive.conf.HiveConf;
0036 import org.apache.hadoop.hive.metastore.api.FieldSchema;
0037 import org.apache.hadoop.hive.metastore.api.Schema;
0038 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
0039 import org.apache.hadoop.hive.ql.Driver;
0040 import org.apache.hadoop.hive.ql.QueryState;
0041 import org.apache.hadoop.hive.ql.exec.ExplainTask;
0042 import org.apache.hadoop.hive.ql.exec.Task;
0043 import org.apache.hadoop.hive.ql.metadata.Hive;
0044 import org.apache.hadoop.hive.ql.metadata.HiveException;
0045 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
0046 import org.apache.hadoop.hive.ql.session.SessionState;
0047 import org.apache.hadoop.hive.serde.serdeConstants;
0048 import org.apache.hadoop.hive.serde2.AbstractSerDe;
0049 import org.apache.hadoop.hive.serde2.SerDeException;
0050 import org.apache.hadoop.hive.serde2.SerDeUtils;
0051 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
0052 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
0053 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
0054 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
0055 import org.apache.hadoop.hive.shims.Utils;
0056 import org.apache.hadoop.io.BytesWritable;
0057 import org.apache.hadoop.security.UserGroupInformation;
0058 import org.apache.hive.service.cli.FetchOrientation;
0059 import org.apache.hive.service.cli.HiveSQLException;
0060 import org.apache.hive.service.cli.OperationState;
0061 import org.apache.hive.service.cli.RowSet;
0062 import org.apache.hive.service.cli.RowSetFactory;
0063 import org.apache.hive.service.cli.TableSchema;
0064 import org.apache.hive.service.cli.session.HiveSession;
0065 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
0066
0067
0068
0069
0070
0071 public class SQLOperation extends ExecuteStatementOperation {
0072
0073 private Driver driver = null;
0074 private CommandProcessorResponse response;
0075 private TableSchema resultSchema = null;
0076 private Schema mResultSchema = null;
0077 private AbstractSerDe serde = null;
0078 private boolean fetchStarted = false;
0079
0080 public SQLOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay,
0081 boolean runInBackground, long queryTimeout) {
0082
0083 super(parentSession, statement, confOverlay, runInBackground);
0084 }
0085
0086
0087
0088
0089
0090
0091 public void prepare(QueryState queryState) throws HiveSQLException {
0092 setState(OperationState.RUNNING);
0093
0094 try {
0095 driver = new Driver(queryState, getParentSession().getUserName());
0096
0097
0098
0099
0100 String guid64 = Base64.encodeBase64URLSafeString(getHandle().getHandleIdentifier()
0101 .toTHandleIdentifier().getGuid()).trim();
0102 driver.setOperationId(guid64);
0103
0104
0105
0106
0107 driver.setTryCount(Integer.MAX_VALUE);
0108
0109 response = driver.compileAndRespond(statement);
0110 if (0 != response.getResponseCode()) {
0111 throw toSQLException("Error while compiling statement", response);
0112 }
0113
0114 mResultSchema = driver.getSchema();
0115
0116
0117
0118 if(driver.getPlan().getFetchTask() != null) {
0119
0120 if (mResultSchema == null || !mResultSchema.isSetFieldSchemas()) {
0121 throw new HiveSQLException("Error compiling query: Schema and FieldSchema " +
0122 "should be set when query plan has a FetchTask");
0123 }
0124 resultSchema = new TableSchema(mResultSchema);
0125 setHasResultSet(true);
0126 } else {
0127 setHasResultSet(false);
0128 }
0129
0130
0131 for (Task<? extends Serializable> task: driver.getPlan().getRootTasks()) {
0132 if (task.getClass() == ExplainTask.class) {
0133 resultSchema = new TableSchema(mResultSchema);
0134 setHasResultSet(true);
0135 break;
0136 }
0137 }
0138 } catch (HiveSQLException e) {
0139 setState(OperationState.ERROR);
0140 throw e;
0141 } catch (Exception e) {
0142 setState(OperationState.ERROR);
0143 throw new HiveSQLException("Error running query: " + e.toString(), e);
0144 }
0145 }
0146
0147 private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException {
0148 try {
0149
0150
0151
0152 driver.setTryCount(Integer.MAX_VALUE);
0153 response = driver.run();
0154 if (0 != response.getResponseCode()) {
0155 throw toSQLException("Error while processing statement", response);
0156 }
0157 } catch (HiveSQLException e) {
0158
0159
0160
0161
0162 if (getStatus().getState() == OperationState.CANCELED) {
0163 return;
0164 }
0165 else {
0166 setState(OperationState.ERROR);
0167 throw e;
0168 }
0169 } catch (Exception e) {
0170 setState(OperationState.ERROR);
0171 throw new HiveSQLException("Error running query: " + e.toString(), e);
0172 }
0173 setState(OperationState.FINISHED);
0174 }
0175
0176 @Override
0177 public void runInternal() throws HiveSQLException {
0178 setState(OperationState.PENDING);
0179 final HiveConf opConfig = getConfigForOperation();
0180 prepare(queryState);
0181 if (!shouldRunAsync()) {
0182 runQuery(opConfig);
0183 } else {
0184
0185 final SessionState parentSessionState = SessionState.get();
0186
0187
0188 final Hive parentHive = getSessionHive();
0189
0190
0191 final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
0192
0193
0194 Runnable backgroundOperation = new Runnable() {
0195 @Override
0196 public void run() {
0197 PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
0198 @Override
0199 public Object run() throws HiveSQLException {
0200 Hive.set(parentHive);
0201 SessionState.setCurrentSessionState(parentSessionState);
0202
0203 registerCurrentOperationLog();
0204 try {
0205 runQuery(opConfig);
0206 } catch (HiveSQLException e) {
0207 setOperationException(e);
0208 LOG.error("Error running hive query: ", e);
0209 } finally {
0210 unregisterOperationLog();
0211 }
0212 return null;
0213 }
0214 };
0215
0216 try {
0217 currentUGI.doAs(doAsAction);
0218 } catch (Exception e) {
0219 setOperationException(new HiveSQLException(e));
0220 LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
0221 }
0222 finally {
0223
0224
0225
0226
0227
0228 if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
0229 ThreadWithGarbageCleanup currentThread =
0230 (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
0231 currentThread.cacheThreadLocalRawStore();
0232 }
0233 }
0234 }
0235 };
0236 try {
0237
0238 Future<?> backgroundHandle =
0239 getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
0240 setBackgroundHandle(backgroundHandle);
0241 } catch (RejectedExecutionException rejected) {
0242 setState(OperationState.ERROR);
0243 throw new HiveSQLException("The background threadpool cannot accept" +
0244 " new task for execution, please retry the operation", rejected);
0245 }
0246 }
0247 }
0248
0249
0250
0251
0252
0253
0254
0255 private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException {
0256 try {
0257 return Utils.getUGI();
0258 } catch (Exception e) {
0259 throw new HiveSQLException("Unable to get current user", e);
0260 }
0261 }
0262
0263
0264
0265
0266
0267
0268 private Hive getSessionHive() throws HiveSQLException {
0269 try {
0270 return Hive.get();
0271 } catch (HiveException e) {
0272 throw new HiveSQLException("Failed to get ThreadLocal Hive object", e);
0273 }
0274 }
0275
0276 private void cleanup(OperationState state) throws HiveSQLException {
0277 setState(state);
0278 if (shouldRunAsync()) {
0279 Future<?> backgroundHandle = getBackgroundHandle();
0280 if (backgroundHandle != null) {
0281 backgroundHandle.cancel(true);
0282 }
0283 }
0284 if (driver != null) {
0285 driver.close();
0286 driver.destroy();
0287 }
0288 driver = null;
0289
0290 SessionState ss = SessionState.get();
0291 if (ss.getTmpOutputFile() != null) {
0292 ss.getTmpOutputFile().delete();
0293 }
0294 }
0295
0296 @Override
0297 public void cancel() throws HiveSQLException {
0298 cleanup(OperationState.CANCELED);
0299 }
0300
0301 @Override
0302 public void close() throws HiveSQLException {
0303 cleanup(OperationState.CLOSED);
0304 cleanupOperationLog();
0305 }
0306
0307 @Override
0308 public TableSchema getResultSetSchema() throws HiveSQLException {
0309 assertState(OperationState.FINISHED);
0310 if (resultSchema == null) {
0311 resultSchema = new TableSchema(driver.getSchema());
0312 }
0313 return resultSchema;
0314 }
0315
0316 private final transient List<Object> convey = new ArrayList<Object>();
0317
0318 @Override
0319 public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
0320 validateDefaultFetchOrientation(orientation);
0321 assertState(OperationState.FINISHED);
0322
0323 RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), false);
0324
0325 try {
0326
0327
0328
0329 if (orientation.equals(FetchOrientation.FETCH_FIRST) && fetchStarted) {
0330 driver.resetFetch();
0331 }
0332 fetchStarted = true;
0333 driver.setMaxRows((int) maxRows);
0334 if (driver.getResults(convey)) {
0335 return decode(convey, rowSet);
0336 }
0337 return rowSet;
0338 } catch (IOException e) {
0339 throw new HiveSQLException(e);
0340 } catch (CommandNeedRetryException e) {
0341 throw new HiveSQLException(e);
0342 } catch (Exception e) {
0343 throw new HiveSQLException(e);
0344 } finally {
0345 convey.clear();
0346 }
0347 }
0348
0349 private RowSet decode(List<Object> rows, RowSet rowSet) throws Exception {
0350 if (driver.isFetchingTable()) {
0351 return prepareFromRow(rows, rowSet);
0352 }
0353 return decodeFromString(rows, rowSet);
0354 }
0355
0356
0357 private RowSet prepareFromRow(List<Object> rows, RowSet rowSet) throws Exception {
0358 for (Object row : rows) {
0359 rowSet.addRow((Object[]) row);
0360 }
0361 return rowSet;
0362 }
0363
0364 private RowSet decodeFromString(List<Object> rows, RowSet rowSet)
0365 throws SQLException, SerDeException {
0366 getSerDe();
0367 StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector();
0368 List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
0369
0370 Object[] deserializedFields = new Object[fieldRefs.size()];
0371 Object rowObj;
0372 ObjectInspector fieldOI;
0373
0374 int protocol = getProtocolVersion().getValue();
0375 for (Object rowString : rows) {
0376 rowObj = serde.deserialize(new BytesWritable(((String)rowString).getBytes(UTF_8)));
0377 for (int i = 0; i < fieldRefs.size(); i++) {
0378 StructField fieldRef = fieldRefs.get(i);
0379 fieldOI = fieldRef.getFieldObjectInspector();
0380 Object fieldData = soi.getStructFieldData(rowObj, fieldRef);
0381 deserializedFields[i] = SerDeUtils.toThriftPayload(fieldData, fieldOI, protocol);
0382 }
0383 rowSet.addRow(deserializedFields);
0384 }
0385 return rowSet;
0386 }
0387
0388 private AbstractSerDe getSerDe() throws SQLException {
0389 if (serde != null) {
0390 return serde;
0391 }
0392 try {
0393 List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas();
0394 StringBuilder namesSb = new StringBuilder();
0395 StringBuilder typesSb = new StringBuilder();
0396
0397 if (fieldSchemas != null && !fieldSchemas.isEmpty()) {
0398 for (int pos = 0; pos < fieldSchemas.size(); pos++) {
0399 if (pos != 0) {
0400 namesSb.append(",");
0401 typesSb.append(",");
0402 }
0403 namesSb.append(fieldSchemas.get(pos).getName());
0404 typesSb.append(fieldSchemas.get(pos).getType());
0405 }
0406 }
0407 String names = namesSb.toString();
0408 String types = typesSb.toString();
0409
0410 serde = new LazySimpleSerDe();
0411 Properties props = new Properties();
0412 if (names.length() > 0) {
0413 LOG.debug("Column names: " + names);
0414 props.setProperty(serdeConstants.LIST_COLUMNS, names);
0415 }
0416 if (types.length() > 0) {
0417 LOG.debug("Column types: " + types);
0418 props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types);
0419 }
0420 SerDeUtils.initializeSerDe(serde, new HiveConf(), props, null);
0421
0422 } catch (Exception ex) {
0423 ex.printStackTrace();
0424 throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex);
0425 }
0426 return serde;
0427 }
0428
0429
0430
0431
0432
0433
0434
0435
0436
0437
0438
0439 private HiveConf getConfigForOperation() throws HiveSQLException {
0440 HiveConf sqlOperationConf = getParentSession().getHiveConf();
0441 if (!getConfOverlay().isEmpty() || shouldRunAsync()) {
0442
0443 sqlOperationConf = new HiveConf(sqlOperationConf);
0444
0445
0446 for (Map.Entry<String, String> confEntry : getConfOverlay().entrySet()) {
0447 try {
0448 sqlOperationConf.verifyAndSet(confEntry.getKey(), confEntry.getValue());
0449 } catch (IllegalArgumentException e) {
0450 throw new HiveSQLException("Error applying statement specific settings", e);
0451 }
0452 }
0453 }
0454 return sqlOperationConf;
0455 }
0456 }