0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.hive.service.cli.operation;
0019
0020 import java.io.File;
0021 import java.io.FileNotFoundException;
0022 import java.util.EnumSet;
0023 import java.util.concurrent.Future;
0024 import java.util.concurrent.TimeUnit;
0025
0026 import org.apache.commons.logging.Log;
0027 import org.apache.commons.logging.LogFactory;
0028 import org.apache.hadoop.hive.conf.HiveConf;
0029 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
0030 import org.apache.hadoop.hive.ql.session.OperationLog;
0031 import org.apache.hive.service.cli.FetchOrientation;
0032 import org.apache.hive.service.cli.HiveSQLException;
0033 import org.apache.hive.service.cli.OperationHandle;
0034 import org.apache.hive.service.cli.OperationState;
0035 import org.apache.hive.service.cli.OperationStatus;
0036 import org.apache.hive.service.cli.OperationType;
0037 import org.apache.hive.service.cli.RowSet;
0038 import org.apache.hive.service.cli.TableSchema;
0039 import org.apache.hive.service.cli.session.HiveSession;
0040 import org.apache.hive.service.cli.thrift.TProtocolVersion;
0041
0042 public abstract class Operation {
0043 protected final HiveSession parentSession;
0044 private OperationState state = OperationState.INITIALIZED;
0045 private final OperationHandle opHandle;
0046 private HiveConf configuration;
0047 public static final Log LOG = LogFactory.getLog(Operation.class.getName());
0048 public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT;
0049 public static final long DEFAULT_FETCH_MAX_ROWS = 100;
0050 protected boolean hasResultSet;
0051 protected volatile HiveSQLException operationException;
0052 protected final boolean runAsync;
0053 protected volatile Future<?> backgroundHandle;
0054 protected OperationLog operationLog;
0055 protected boolean isOperationLogEnabled;
0056
0057 private long operationTimeout;
0058 private long lastAccessTime;
0059
0060 protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
0061 EnumSet.of(
0062 FetchOrientation.FETCH_NEXT,
0063 FetchOrientation.FETCH_FIRST,
0064 FetchOrientation.FETCH_PRIOR);
0065
0066 protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
0067 this.parentSession = parentSession;
0068 this.runAsync = runInBackground;
0069 this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
0070 lastAccessTime = System.currentTimeMillis();
0071 operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
0072 HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
0073 }
0074
0075 public Future<?> getBackgroundHandle() {
0076 return backgroundHandle;
0077 }
0078
0079 protected void setBackgroundHandle(Future<?> backgroundHandle) {
0080 this.backgroundHandle = backgroundHandle;
0081 }
0082
0083 public boolean shouldRunAsync() {
0084 return runAsync;
0085 }
0086
0087 public void setConfiguration(HiveConf configuration) {
0088 this.configuration = new HiveConf(configuration);
0089 }
0090
0091 public HiveConf getConfiguration() {
0092 return new HiveConf(configuration);
0093 }
0094
0095 public HiveSession getParentSession() {
0096 return parentSession;
0097 }
0098
0099 public OperationHandle getHandle() {
0100 return opHandle;
0101 }
0102
0103 public TProtocolVersion getProtocolVersion() {
0104 return opHandle.getProtocolVersion();
0105 }
0106
0107 public OperationType getType() {
0108 return opHandle.getOperationType();
0109 }
0110
0111 public OperationStatus getStatus() {
0112 return new OperationStatus(state, operationException);
0113 }
0114
0115 public boolean hasResultSet() {
0116 return hasResultSet;
0117 }
0118
0119 protected void setHasResultSet(boolean hasResultSet) {
0120 this.hasResultSet = hasResultSet;
0121 opHandle.setHasResultSet(hasResultSet);
0122 }
0123
0124 public OperationLog getOperationLog() {
0125 return operationLog;
0126 }
0127
0128 protected final OperationState setState(OperationState newState) throws HiveSQLException {
0129 state.validateTransition(newState);
0130 this.state = newState;
0131 this.lastAccessTime = System.currentTimeMillis();
0132 return this.state;
0133 }
0134
0135 public boolean isTimedOut(long current) {
0136 if (operationTimeout == 0) {
0137 return false;
0138 }
0139 if (operationTimeout > 0) {
0140
0141 return state.isTerminal() && lastAccessTime + operationTimeout <= current;
0142 }
0143 return lastAccessTime + -operationTimeout <= current;
0144 }
0145
0146 public long getLastAccessTime() {
0147 return lastAccessTime;
0148 }
0149
0150 public long getOperationTimeout() {
0151 return operationTimeout;
0152 }
0153
0154 public void setOperationTimeout(long operationTimeout) {
0155 this.operationTimeout = operationTimeout;
0156 }
0157
0158 protected void setOperationException(HiveSQLException operationException) {
0159 this.operationException = operationException;
0160 }
0161
0162 protected final void assertState(OperationState state) throws HiveSQLException {
0163 if (this.state != state) {
0164 throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
0165 }
0166 this.lastAccessTime = System.currentTimeMillis();
0167 }
0168
0169 public boolean isRunning() {
0170 return OperationState.RUNNING.equals(state);
0171 }
0172
0173 public boolean isFinished() {
0174 return OperationState.FINISHED.equals(state);
0175 }
0176
0177 public boolean isCanceled() {
0178 return OperationState.CANCELED.equals(state);
0179 }
0180
0181 public boolean isFailed() {
0182 return OperationState.ERROR.equals(state);
0183 }
0184
0185 protected void createOperationLog() {
0186 if (parentSession.isOperationLogEnabled()) {
0187 File operationLogFile = new File(parentSession.getOperationLogSessionDir(),
0188 opHandle.getHandleIdentifier().toString());
0189 isOperationLogEnabled = true;
0190
0191
0192 try {
0193 if (operationLogFile.exists()) {
0194 LOG.warn("The operation log file should not exist, but it is already there: " +
0195 operationLogFile.getAbsolutePath());
0196 operationLogFile.delete();
0197 }
0198 if (!operationLogFile.createNewFile()) {
0199
0200
0201 if (!operationLogFile.canRead() || !operationLogFile.canWrite()) {
0202 LOG.warn("The already existed operation log file cannot be recreated, " +
0203 "and it cannot be read or written: " + operationLogFile.getAbsolutePath());
0204 isOperationLogEnabled = false;
0205 return;
0206 }
0207 }
0208 } catch (Exception e) {
0209 LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e);
0210 isOperationLogEnabled = false;
0211 return;
0212 }
0213
0214
0215 try {
0216 operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf());
0217 } catch (FileNotFoundException e) {
0218 LOG.warn("Unable to instantiate OperationLog object for operation: " +
0219 opHandle, e);
0220 isOperationLogEnabled = false;
0221 return;
0222 }
0223
0224
0225 OperationLog.setCurrentOperationLog(operationLog);
0226 }
0227 }
0228
0229 protected void unregisterOperationLog() {
0230 if (isOperationLogEnabled) {
0231 OperationLog.removeCurrentOperationLog();
0232 }
0233 }
0234
0235
0236
0237
0238
0239 protected void beforeRun() {
0240 createOperationLog();
0241 }
0242
0243
0244
0245
0246
0247 protected void afterRun() {
0248 unregisterOperationLog();
0249 }
0250
0251
0252
0253
0254
0255 protected abstract void runInternal() throws HiveSQLException;
0256
0257 public void run() throws HiveSQLException {
0258 beforeRun();
0259 try {
0260 runInternal();
0261 } finally {
0262 afterRun();
0263 }
0264 }
0265
0266 protected void cleanupOperationLog() {
0267 if (isOperationLogEnabled) {
0268 if (operationLog == null) {
0269 LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] "
0270 + "logging is enabled, but its OperationLog object cannot be found.");
0271 } else {
0272 operationLog.close();
0273 }
0274 }
0275 }
0276
0277
0278 public void cancel() throws HiveSQLException {
0279 setState(OperationState.CANCELED);
0280 throw new UnsupportedOperationException("SQLOperation.cancel()");
0281 }
0282
0283 public void close() throws HiveSQLException {
0284 setState(OperationState.CLOSED);
0285 cleanupOperationLog();
0286 }
0287
0288 public abstract TableSchema getResultSetSchema() throws HiveSQLException;
0289
0290 public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException;
0291
0292 public RowSet getNextRowSet() throws HiveSQLException {
0293 return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
0294 }
0295
0296
0297
0298
0299
0300
0301 protected void validateDefaultFetchOrientation(FetchOrientation orientation)
0302 throws HiveSQLException {
0303 validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET);
0304 }
0305
0306
0307
0308
0309
0310
0311
0312 protected void validateFetchOrientation(FetchOrientation orientation,
0313 EnumSet<FetchOrientation> supportedOrientations) throws HiveSQLException {
0314 if (!supportedOrientations.contains(orientation)) {
0315 throw new HiveSQLException("The fetch type " + orientation.toString() +
0316 " is not supported for this resultset", "HY106");
0317 }
0318 }
0319
0320 protected HiveSQLException toSQLException(String prefix, CommandProcessorResponse response) {
0321 HiveSQLException ex = new HiveSQLException(prefix + ": " + response.getErrorMessage(),
0322 response.getSQLState(), response.getResponseCode());
0323 if (response.getException() != null) {
0324 ex.initCause(response.getException());
0325 }
0326 return ex;
0327 }
0328 }