Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 package org.apache.hive.service.cli.operation;
0019 
0020 import java.io.File;
0021 import java.io.FileNotFoundException;
0022 import java.util.EnumSet;
0023 import java.util.HashMap;
0024 import java.util.Map;
0025 import java.util.concurrent.Future;
0026 import java.util.concurrent.TimeUnit;
0027 
0028 import org.apache.hadoop.hive.conf.HiveConf;
0029 import org.apache.hadoop.hive.ql.QueryState;
0030 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
0031 import org.apache.hadoop.hive.ql.session.OperationLog;
0032 import org.apache.hive.service.cli.FetchOrientation;
0033 import org.apache.hive.service.cli.HiveSQLException;
0034 import org.apache.hive.service.cli.OperationHandle;
0035 import org.apache.hive.service.cli.OperationState;
0036 import org.apache.hive.service.cli.OperationStatus;
0037 import org.apache.hive.service.cli.OperationType;
0038 import org.apache.hive.service.cli.RowSet;
0039 import org.apache.hive.service.cli.TableSchema;
0040 import org.apache.hive.service.cli.session.HiveSession;
0041 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
0042 import org.slf4j.Logger;
0043 import org.slf4j.LoggerFactory;
0044 
0045 public abstract class Operation {
0046   protected final HiveSession parentSession;
0047   private OperationState state = OperationState.INITIALIZED;
0048   private final OperationHandle opHandle;
0049   private HiveConf configuration;
0050   public static final Logger LOG = LoggerFactory.getLogger(Operation.class.getName());
0051   public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT;
0052   public static final long DEFAULT_FETCH_MAX_ROWS = 100;
0053   protected boolean hasResultSet;
0054   protected volatile HiveSQLException operationException;
0055   protected final boolean runAsync;
0056   protected volatile Future<?> backgroundHandle;
0057   protected OperationLog operationLog;
0058   protected boolean isOperationLogEnabled;
0059   protected Map<String, String> confOverlay = new HashMap<String, String>();
0060 
0061   private long operationTimeout;
0062   private long lastAccessTime;
0063 
0064   protected final QueryState queryState;
0065 
0066   protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
0067       EnumSet.of(
0068           FetchOrientation.FETCH_NEXT,
0069           FetchOrientation.FETCH_FIRST,
0070           FetchOrientation.FETCH_PRIOR);
0071 
0072   protected Operation(HiveSession parentSession, OperationType opType) {
0073     this(parentSession, null, opType);
0074   }
0075 
0076   protected Operation(HiveSession parentSession, Map<String, String> confOverlay,
0077       OperationType opType) {
0078     this(parentSession, confOverlay, opType, false);
0079   }
0080 
0081   protected Operation(HiveSession parentSession,
0082       Map<String, String> confOverlay, OperationType opType, boolean runInBackground) {
0083     this.parentSession = parentSession;
0084     this.confOverlay = confOverlay;
0085     this.runAsync = runInBackground;
0086     this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
0087     lastAccessTime = System.currentTimeMillis();
0088     operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
0089         HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
0090     queryState = new QueryState(parentSession.getHiveConf(), confOverlay, runInBackground);
0091   }
0092 
0093   public Future<?> getBackgroundHandle() {
0094     return backgroundHandle;
0095   }
0096 
0097   protected void setBackgroundHandle(Future<?> backgroundHandle) {
0098     this.backgroundHandle = backgroundHandle;
0099   }
0100 
0101   public boolean shouldRunAsync() {
0102     return runAsync;
0103   }
0104 
0105   public void setConfiguration(HiveConf configuration) {
0106     this.configuration = new HiveConf(configuration);
0107   }
0108 
0109   public HiveConf getConfiguration() {
0110     return new HiveConf(configuration);
0111   }
0112 
0113   public HiveSession getParentSession() {
0114     return parentSession;
0115   }
0116 
0117   public OperationHandle getHandle() {
0118     return opHandle;
0119   }
0120 
0121   public TProtocolVersion getProtocolVersion() {
0122     return opHandle.getProtocolVersion();
0123   }
0124 
0125   public OperationType getType() {
0126     return opHandle.getOperationType();
0127   }
0128 
0129   public OperationStatus getStatus() {
0130     return new OperationStatus(state, operationException);
0131   }
0132 
0133   public boolean hasResultSet() {
0134     return hasResultSet;
0135   }
0136 
0137   protected void setHasResultSet(boolean hasResultSet) {
0138     this.hasResultSet = hasResultSet;
0139     opHandle.setHasResultSet(hasResultSet);
0140   }
0141 
0142   public OperationLog getOperationLog() {
0143     return operationLog;
0144   }
0145 
0146   protected final OperationState setState(OperationState newState) throws HiveSQLException {
0147     state.validateTransition(newState);
0148     this.state = newState;
0149     this.lastAccessTime = System.currentTimeMillis();
0150     return this.state;
0151   }
0152 
0153   public boolean isTimedOut(long current) {
0154     if (operationTimeout == 0) {
0155       return false;
0156     }
0157     if (operationTimeout > 0) {
0158       // check only when it's in terminal state
0159       return state.isTerminal() && lastAccessTime + operationTimeout <= current;
0160     }
0161     return lastAccessTime + -operationTimeout <= current;
0162   }
0163 
0164   public long getLastAccessTime() {
0165     return lastAccessTime;
0166   }
0167 
0168   public long getOperationTimeout() {
0169     return operationTimeout;
0170   }
0171 
0172   public void setOperationTimeout(long operationTimeout) {
0173     this.operationTimeout = operationTimeout;
0174   }
0175 
0176   protected void setOperationException(HiveSQLException operationException) {
0177     this.operationException = operationException;
0178   }
0179 
0180   protected final void assertState(OperationState state) throws HiveSQLException {
0181     if (this.state != state) {
0182       throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
0183     }
0184     this.lastAccessTime = System.currentTimeMillis();
0185   }
0186 
0187   public boolean isRunning() {
0188     return OperationState.RUNNING.equals(state);
0189   }
0190 
0191   public boolean isFinished() {
0192     return OperationState.FINISHED.equals(state);
0193   }
0194 
0195   public boolean isCanceled() {
0196     return OperationState.CANCELED.equals(state);
0197   }
0198 
0199   public boolean isFailed() {
0200     return OperationState.ERROR.equals(state);
0201   }
0202 
0203   protected void createOperationLog() {
0204     if (parentSession.isOperationLogEnabled()) {
0205       File operationLogFile = new File(parentSession.getOperationLogSessionDir(),
0206           opHandle.getHandleIdentifier().toString());
0207       isOperationLogEnabled = true;
0208 
0209       // create log file
0210       try {
0211         if (operationLogFile.exists()) {
0212           LOG.warn("The operation log file should not exist, but it is already there: " +
0213               operationLogFile.getAbsolutePath());
0214           operationLogFile.delete();
0215         }
0216         if (!operationLogFile.createNewFile()) {
0217           // the log file already exists and cannot be deleted.
0218           // If it can be read/written, keep its contents and use it.
0219           if (!operationLogFile.canRead() || !operationLogFile.canWrite()) {
0220             LOG.warn("The already existed operation log file cannot be recreated, " +
0221                 "and it cannot be read or written: " + operationLogFile.getAbsolutePath());
0222             isOperationLogEnabled = false;
0223             return;
0224           }
0225         }
0226       } catch (Exception e) {
0227         LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e);
0228         isOperationLogEnabled = false;
0229         return;
0230       }
0231 
0232       // create OperationLog object with above log file
0233       try {
0234         operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf());
0235       } catch (FileNotFoundException e) {
0236         LOG.warn("Unable to instantiate OperationLog object for operation: " +
0237             opHandle, e);
0238         isOperationLogEnabled = false;
0239         return;
0240       }
0241 
0242       // register this operationLog to current thread
0243       OperationLog.setCurrentOperationLog(operationLog);
0244     }
0245   }
0246 
0247   protected void unregisterOperationLog() {
0248     if (isOperationLogEnabled) {
0249       OperationLog.removeCurrentOperationLog();
0250     }
0251   }
0252 
0253   /**
0254    * Invoked before runInternal().
0255    * Set up some preconditions, or configurations.
0256    */
0257   protected void beforeRun() {
0258     createOperationLog();
0259   }
0260 
0261   /**
0262    * Invoked after runInternal(), even if an exception is thrown in runInternal().
0263    * Clean up resources, which was set up in beforeRun().
0264    */
0265   protected void afterRun() {
0266     unregisterOperationLog();
0267   }
0268 
0269   /**
0270    * Implemented by subclass of Operation class to execute specific behaviors.
0271    * @throws HiveSQLException
0272    */
0273   protected abstract void runInternal() throws HiveSQLException;
0274 
0275   public void run() throws HiveSQLException {
0276     beforeRun();
0277     try {
0278       runInternal();
0279     } finally {
0280       afterRun();
0281     }
0282   }
0283 
0284   protected void cleanupOperationLog() {
0285     if (isOperationLogEnabled) {
0286       if (operationLog == null) {
0287         LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] "
0288           + "logging is enabled, but its OperationLog object cannot be found.");
0289       } else {
0290         operationLog.close();
0291       }
0292     }
0293   }
0294 
0295   // TODO: make this abstract and implement in subclasses.
0296   public void cancel() throws HiveSQLException {
0297     setState(OperationState.CANCELED);
0298     throw new UnsupportedOperationException("SQLOperation.cancel()");
0299   }
0300 
0301   public void close() throws HiveSQLException {
0302     setState(OperationState.CLOSED);
0303     cleanupOperationLog();
0304   }
0305 
0306   public abstract TableSchema getResultSetSchema() throws HiveSQLException;
0307 
0308   public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException;
0309 
0310   public RowSet getNextRowSet() throws HiveSQLException {
0311     return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
0312   }
0313 
0314   /**
0315    * Verify if the given fetch orientation is part of the default orientation types.
0316    * @param orientation
0317    * @throws HiveSQLException
0318    */
0319   protected void validateDefaultFetchOrientation(FetchOrientation orientation)
0320       throws HiveSQLException {
0321     validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET);
0322   }
0323 
0324   /**
0325    * Verify if the given fetch orientation is part of the supported orientation types.
0326    * @param orientation
0327    * @param supportedOrientations
0328    * @throws HiveSQLException
0329    */
0330   protected void validateFetchOrientation(FetchOrientation orientation,
0331       EnumSet<FetchOrientation> supportedOrientations) throws HiveSQLException {
0332     if (!supportedOrientations.contains(orientation)) {
0333       throw new HiveSQLException("The fetch type " + orientation.toString() +
0334           " is not supported for this resultset", "HY106");
0335     }
0336   }
0337 
0338   protected HiveSQLException toSQLException(String prefix, CommandProcessorResponse response) {
0339     HiveSQLException ex = new HiveSQLException(prefix + ": " + response.getErrorMessage(),
0340         response.getSQLState(), response.getResponseCode());
0341     if (response.getException() != null) {
0342       ex.initCause(response.getException());
0343     }
0344     return ex;
0345   }
0346 
0347   protected Map<String, String> getConfOverlay() {
0348     return confOverlay;
0349   }
0350 }