0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.hive.service.cli.operation;
0019
0020 import java.io.File;
0021 import java.io.FileNotFoundException;
0022 import java.util.EnumSet;
0023 import java.util.HashMap;
0024 import java.util.Map;
0025 import java.util.concurrent.Future;
0026 import java.util.concurrent.TimeUnit;
0027
0028 import org.apache.hadoop.hive.conf.HiveConf;
0029 import org.apache.hadoop.hive.ql.QueryState;
0030 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
0031 import org.apache.hadoop.hive.ql.session.OperationLog;
0032 import org.apache.hive.service.cli.FetchOrientation;
0033 import org.apache.hive.service.cli.HiveSQLException;
0034 import org.apache.hive.service.cli.OperationHandle;
0035 import org.apache.hive.service.cli.OperationState;
0036 import org.apache.hive.service.cli.OperationStatus;
0037 import org.apache.hive.service.cli.OperationType;
0038 import org.apache.hive.service.cli.RowSet;
0039 import org.apache.hive.service.cli.TableSchema;
0040 import org.apache.hive.service.cli.session.HiveSession;
0041 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
0042 import org.slf4j.Logger;
0043 import org.slf4j.LoggerFactory;
0044
0045 public abstract class Operation {
0046 protected final HiveSession parentSession;
0047 private OperationState state = OperationState.INITIALIZED;
0048 private final OperationHandle opHandle;
0049 private HiveConf configuration;
0050 public static final Logger LOG = LoggerFactory.getLogger(Operation.class.getName());
0051 public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT;
0052 public static final long DEFAULT_FETCH_MAX_ROWS = 100;
0053 protected boolean hasResultSet;
0054 protected volatile HiveSQLException operationException;
0055 protected final boolean runAsync;
0056 protected volatile Future<?> backgroundHandle;
0057 protected OperationLog operationLog;
0058 protected boolean isOperationLogEnabled;
0059 protected Map<String, String> confOverlay = new HashMap<String, String>();
0060
0061 private long operationTimeout;
0062 private long lastAccessTime;
0063
0064 protected final QueryState queryState;
0065
0066 protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
0067 EnumSet.of(
0068 FetchOrientation.FETCH_NEXT,
0069 FetchOrientation.FETCH_FIRST,
0070 FetchOrientation.FETCH_PRIOR);
0071
0072 protected Operation(HiveSession parentSession, OperationType opType) {
0073 this(parentSession, null, opType);
0074 }
0075
0076 protected Operation(HiveSession parentSession, Map<String, String> confOverlay,
0077 OperationType opType) {
0078 this(parentSession, confOverlay, opType, false);
0079 }
0080
0081 protected Operation(HiveSession parentSession,
0082 Map<String, String> confOverlay, OperationType opType, boolean runInBackground) {
0083 this.parentSession = parentSession;
0084 this.confOverlay = confOverlay;
0085 this.runAsync = runInBackground;
0086 this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
0087 lastAccessTime = System.currentTimeMillis();
0088 operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
0089 HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
0090 queryState = new QueryState(parentSession.getHiveConf(), confOverlay, runInBackground);
0091 }
0092
0093 public Future<?> getBackgroundHandle() {
0094 return backgroundHandle;
0095 }
0096
0097 protected void setBackgroundHandle(Future<?> backgroundHandle) {
0098 this.backgroundHandle = backgroundHandle;
0099 }
0100
0101 public boolean shouldRunAsync() {
0102 return runAsync;
0103 }
0104
0105 public void setConfiguration(HiveConf configuration) {
0106 this.configuration = new HiveConf(configuration);
0107 }
0108
0109 public HiveConf getConfiguration() {
0110 return new HiveConf(configuration);
0111 }
0112
0113 public HiveSession getParentSession() {
0114 return parentSession;
0115 }
0116
0117 public OperationHandle getHandle() {
0118 return opHandle;
0119 }
0120
0121 public TProtocolVersion getProtocolVersion() {
0122 return opHandle.getProtocolVersion();
0123 }
0124
0125 public OperationType getType() {
0126 return opHandle.getOperationType();
0127 }
0128
0129 public OperationStatus getStatus() {
0130 return new OperationStatus(state, operationException);
0131 }
0132
0133 public boolean hasResultSet() {
0134 return hasResultSet;
0135 }
0136
0137 protected void setHasResultSet(boolean hasResultSet) {
0138 this.hasResultSet = hasResultSet;
0139 opHandle.setHasResultSet(hasResultSet);
0140 }
0141
0142 public OperationLog getOperationLog() {
0143 return operationLog;
0144 }
0145
0146 protected final OperationState setState(OperationState newState) throws HiveSQLException {
0147 state.validateTransition(newState);
0148 this.state = newState;
0149 this.lastAccessTime = System.currentTimeMillis();
0150 return this.state;
0151 }
0152
0153 public boolean isTimedOut(long current) {
0154 if (operationTimeout == 0) {
0155 return false;
0156 }
0157 if (operationTimeout > 0) {
0158
0159 return state.isTerminal() && lastAccessTime + operationTimeout <= current;
0160 }
0161 return lastAccessTime + -operationTimeout <= current;
0162 }
0163
0164 public long getLastAccessTime() {
0165 return lastAccessTime;
0166 }
0167
0168 public long getOperationTimeout() {
0169 return operationTimeout;
0170 }
0171
0172 public void setOperationTimeout(long operationTimeout) {
0173 this.operationTimeout = operationTimeout;
0174 }
0175
0176 protected void setOperationException(HiveSQLException operationException) {
0177 this.operationException = operationException;
0178 }
0179
0180 protected final void assertState(OperationState state) throws HiveSQLException {
0181 if (this.state != state) {
0182 throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
0183 }
0184 this.lastAccessTime = System.currentTimeMillis();
0185 }
0186
0187 public boolean isRunning() {
0188 return OperationState.RUNNING.equals(state);
0189 }
0190
0191 public boolean isFinished() {
0192 return OperationState.FINISHED.equals(state);
0193 }
0194
0195 public boolean isCanceled() {
0196 return OperationState.CANCELED.equals(state);
0197 }
0198
0199 public boolean isFailed() {
0200 return OperationState.ERROR.equals(state);
0201 }
0202
0203 protected void createOperationLog() {
0204 if (parentSession.isOperationLogEnabled()) {
0205 File operationLogFile = new File(parentSession.getOperationLogSessionDir(),
0206 opHandle.getHandleIdentifier().toString());
0207 isOperationLogEnabled = true;
0208
0209
0210 try {
0211 if (operationLogFile.exists()) {
0212 LOG.warn("The operation log file should not exist, but it is already there: " +
0213 operationLogFile.getAbsolutePath());
0214 operationLogFile.delete();
0215 }
0216 if (!operationLogFile.createNewFile()) {
0217
0218
0219 if (!operationLogFile.canRead() || !operationLogFile.canWrite()) {
0220 LOG.warn("The already existed operation log file cannot be recreated, " +
0221 "and it cannot be read or written: " + operationLogFile.getAbsolutePath());
0222 isOperationLogEnabled = false;
0223 return;
0224 }
0225 }
0226 } catch (Exception e) {
0227 LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e);
0228 isOperationLogEnabled = false;
0229 return;
0230 }
0231
0232
0233 try {
0234 operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf());
0235 } catch (FileNotFoundException e) {
0236 LOG.warn("Unable to instantiate OperationLog object for operation: " +
0237 opHandle, e);
0238 isOperationLogEnabled = false;
0239 return;
0240 }
0241
0242
0243 OperationLog.setCurrentOperationLog(operationLog);
0244 }
0245 }
0246
0247 protected void unregisterOperationLog() {
0248 if (isOperationLogEnabled) {
0249 OperationLog.removeCurrentOperationLog();
0250 }
0251 }
0252
0253
0254
0255
0256
0257 protected void beforeRun() {
0258 createOperationLog();
0259 }
0260
0261
0262
0263
0264
0265 protected void afterRun() {
0266 unregisterOperationLog();
0267 }
0268
0269
0270
0271
0272
0273 protected abstract void runInternal() throws HiveSQLException;
0274
0275 public void run() throws HiveSQLException {
0276 beforeRun();
0277 try {
0278 runInternal();
0279 } finally {
0280 afterRun();
0281 }
0282 }
0283
0284 protected void cleanupOperationLog() {
0285 if (isOperationLogEnabled) {
0286 if (operationLog == null) {
0287 LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] "
0288 + "logging is enabled, but its OperationLog object cannot be found.");
0289 } else {
0290 operationLog.close();
0291 }
0292 }
0293 }
0294
0295
0296 public void cancel() throws HiveSQLException {
0297 setState(OperationState.CANCELED);
0298 throw new UnsupportedOperationException("SQLOperation.cancel()");
0299 }
0300
0301 public void close() throws HiveSQLException {
0302 setState(OperationState.CLOSED);
0303 cleanupOperationLog();
0304 }
0305
0306 public abstract TableSchema getResultSetSchema() throws HiveSQLException;
0307
0308 public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException;
0309
0310 public RowSet getNextRowSet() throws HiveSQLException {
0311 return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
0312 }
0313
0314
0315
0316
0317
0318
0319 protected void validateDefaultFetchOrientation(FetchOrientation orientation)
0320 throws HiveSQLException {
0321 validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET);
0322 }
0323
0324
0325
0326
0327
0328
0329
0330 protected void validateFetchOrientation(FetchOrientation orientation,
0331 EnumSet<FetchOrientation> supportedOrientations) throws HiveSQLException {
0332 if (!supportedOrientations.contains(orientation)) {
0333 throw new HiveSQLException("The fetch type " + orientation.toString() +
0334 " is not supported for this resultset", "HY106");
0335 }
0336 }
0337
0338 protected HiveSQLException toSQLException(String prefix, CommandProcessorResponse response) {
0339 HiveSQLException ex = new HiveSQLException(prefix + ": " + response.getErrorMessage(),
0340 response.getSQLState(), response.getResponseCode());
0341 if (response.getException() != null) {
0342 ex.initCause(response.getException());
0343 }
0344 return ex;
0345 }
0346
0347 protected Map<String, String> getConfOverlay() {
0348 return confOverlay;
0349 }
0350 }