Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 package org.apache.hive.service.cli.operation;
0019 
0020 import java.io.File;
0021 import java.io.FileNotFoundException;
0022 import java.util.EnumSet;
0023 import java.util.concurrent.Future;
0024 import java.util.concurrent.TimeUnit;
0025 
0026 import org.apache.commons.logging.Log;
0027 import org.apache.commons.logging.LogFactory;
0028 import org.apache.hadoop.hive.conf.HiveConf;
0029 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
0030 import org.apache.hadoop.hive.ql.session.OperationLog;
0031 import org.apache.hive.service.cli.FetchOrientation;
0032 import org.apache.hive.service.cli.HiveSQLException;
0033 import org.apache.hive.service.cli.OperationHandle;
0034 import org.apache.hive.service.cli.OperationState;
0035 import org.apache.hive.service.cli.OperationStatus;
0036 import org.apache.hive.service.cli.OperationType;
0037 import org.apache.hive.service.cli.RowSet;
0038 import org.apache.hive.service.cli.TableSchema;
0039 import org.apache.hive.service.cli.session.HiveSession;
0040 import org.apache.hive.service.cli.thrift.TProtocolVersion;
0041 
0042 public abstract class Operation {
0043   protected final HiveSession parentSession;
0044   private OperationState state = OperationState.INITIALIZED;
0045   private final OperationHandle opHandle;
0046   private HiveConf configuration;
0047   public static final Log LOG = LogFactory.getLog(Operation.class.getName());
0048   public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT;
0049   public static final long DEFAULT_FETCH_MAX_ROWS = 100;
0050   protected boolean hasResultSet;
0051   protected volatile HiveSQLException operationException;
0052   protected final boolean runAsync;
0053   protected volatile Future<?> backgroundHandle;
0054   protected OperationLog operationLog;
0055   protected boolean isOperationLogEnabled;
0056 
0057   private long operationTimeout;
0058   private long lastAccessTime;
0059 
0060   protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
0061       EnumSet.of(
0062           FetchOrientation.FETCH_NEXT,
0063           FetchOrientation.FETCH_FIRST,
0064           FetchOrientation.FETCH_PRIOR);
0065 
0066   protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
0067     this.parentSession = parentSession;
0068     this.runAsync = runInBackground;
0069     this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
0070     lastAccessTime = System.currentTimeMillis();
0071     operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
0072         HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
0073   }
0074 
0075   public Future<?> getBackgroundHandle() {
0076     return backgroundHandle;
0077   }
0078 
0079   protected void setBackgroundHandle(Future<?> backgroundHandle) {
0080     this.backgroundHandle = backgroundHandle;
0081   }
0082 
0083   public boolean shouldRunAsync() {
0084     return runAsync;
0085   }
0086 
0087   public void setConfiguration(HiveConf configuration) {
0088     this.configuration = new HiveConf(configuration);
0089   }
0090 
0091   public HiveConf getConfiguration() {
0092     return new HiveConf(configuration);
0093   }
0094 
0095   public HiveSession getParentSession() {
0096     return parentSession;
0097   }
0098 
0099   public OperationHandle getHandle() {
0100     return opHandle;
0101   }
0102 
0103   public TProtocolVersion getProtocolVersion() {
0104     return opHandle.getProtocolVersion();
0105   }
0106 
0107   public OperationType getType() {
0108     return opHandle.getOperationType();
0109   }
0110 
0111   public OperationStatus getStatus() {
0112     return new OperationStatus(state, operationException);
0113   }
0114 
0115   public boolean hasResultSet() {
0116     return hasResultSet;
0117   }
0118 
0119   protected void setHasResultSet(boolean hasResultSet) {
0120     this.hasResultSet = hasResultSet;
0121     opHandle.setHasResultSet(hasResultSet);
0122   }
0123 
0124   public OperationLog getOperationLog() {
0125     return operationLog;
0126   }
0127 
0128   protected final OperationState setState(OperationState newState) throws HiveSQLException {
0129     state.validateTransition(newState);
0130     this.state = newState;
0131     this.lastAccessTime = System.currentTimeMillis();
0132     return this.state;
0133   }
0134 
0135   public boolean isTimedOut(long current) {
0136     if (operationTimeout == 0) {
0137       return false;
0138     }
0139     if (operationTimeout > 0) {
0140       // check only when it's in terminal state
0141       return state.isTerminal() && lastAccessTime + operationTimeout <= current;
0142     }
0143     return lastAccessTime + -operationTimeout <= current;
0144   }
0145 
0146   public long getLastAccessTime() {
0147     return lastAccessTime;
0148   }
0149 
0150   public long getOperationTimeout() {
0151     return operationTimeout;
0152   }
0153 
0154   public void setOperationTimeout(long operationTimeout) {
0155     this.operationTimeout = operationTimeout;
0156   }
0157 
0158   protected void setOperationException(HiveSQLException operationException) {
0159     this.operationException = operationException;
0160   }
0161 
0162   protected final void assertState(OperationState state) throws HiveSQLException {
0163     if (this.state != state) {
0164       throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
0165     }
0166     this.lastAccessTime = System.currentTimeMillis();
0167   }
0168 
0169   public boolean isRunning() {
0170     return OperationState.RUNNING.equals(state);
0171   }
0172 
0173   public boolean isFinished() {
0174     return OperationState.FINISHED.equals(state);
0175   }
0176 
0177   public boolean isCanceled() {
0178     return OperationState.CANCELED.equals(state);
0179   }
0180 
0181   public boolean isFailed() {
0182     return OperationState.ERROR.equals(state);
0183   }
0184 
0185   protected void createOperationLog() {
0186     if (parentSession.isOperationLogEnabled()) {
0187       File operationLogFile = new File(parentSession.getOperationLogSessionDir(),
0188           opHandle.getHandleIdentifier().toString());
0189       isOperationLogEnabled = true;
0190 
0191       // create log file
0192       try {
0193         if (operationLogFile.exists()) {
0194           LOG.warn("The operation log file should not exist, but it is already there: " +
0195               operationLogFile.getAbsolutePath());
0196           operationLogFile.delete();
0197         }
0198         if (!operationLogFile.createNewFile()) {
0199           // the log file already exists and cannot be deleted.
0200           // If it can be read/written, keep its contents and use it.
0201           if (!operationLogFile.canRead() || !operationLogFile.canWrite()) {
0202             LOG.warn("The already existed operation log file cannot be recreated, " +
0203                 "and it cannot be read or written: " + operationLogFile.getAbsolutePath());
0204             isOperationLogEnabled = false;
0205             return;
0206           }
0207         }
0208       } catch (Exception e) {
0209         LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e);
0210         isOperationLogEnabled = false;
0211         return;
0212       }
0213 
0214       // create OperationLog object with above log file
0215       try {
0216         operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf());
0217       } catch (FileNotFoundException e) {
0218         LOG.warn("Unable to instantiate OperationLog object for operation: " +
0219             opHandle, e);
0220         isOperationLogEnabled = false;
0221         return;
0222       }
0223 
0224       // register this operationLog to current thread
0225       OperationLog.setCurrentOperationLog(operationLog);
0226     }
0227   }
0228 
0229   protected void unregisterOperationLog() {
0230     if (isOperationLogEnabled) {
0231       OperationLog.removeCurrentOperationLog();
0232     }
0233   }
0234 
0235   /**
0236    * Invoked before runInternal().
0237    * Set up some preconditions, or configurations.
0238    */
0239   protected void beforeRun() {
0240     createOperationLog();
0241   }
0242 
0243   /**
0244    * Invoked after runInternal(), even if an exception is thrown in runInternal().
0245    * Clean up resources, which was set up in beforeRun().
0246    */
0247   protected void afterRun() {
0248     unregisterOperationLog();
0249   }
0250 
0251   /**
0252    * Implemented by subclass of Operation class to execute specific behaviors.
0253    * @throws HiveSQLException
0254    */
0255   protected abstract void runInternal() throws HiveSQLException;
0256 
0257   public void run() throws HiveSQLException {
0258     beforeRun();
0259     try {
0260       runInternal();
0261     } finally {
0262       afterRun();
0263     }
0264   }
0265 
0266   protected void cleanupOperationLog() {
0267     if (isOperationLogEnabled) {
0268       if (operationLog == null) {
0269         LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] "
0270           + "logging is enabled, but its OperationLog object cannot be found.");
0271       } else {
0272         operationLog.close();
0273       }
0274     }
0275   }
0276 
0277   // TODO: make this abstract and implement in subclasses.
0278   public void cancel() throws HiveSQLException {
0279     setState(OperationState.CANCELED);
0280     throw new UnsupportedOperationException("SQLOperation.cancel()");
0281   }
0282 
0283   public void close() throws HiveSQLException {
0284     setState(OperationState.CLOSED);
0285     cleanupOperationLog();
0286   }
0287 
0288   public abstract TableSchema getResultSetSchema() throws HiveSQLException;
0289 
0290   public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException;
0291 
0292   public RowSet getNextRowSet() throws HiveSQLException {
0293     return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
0294   }
0295 
0296   /**
0297    * Verify if the given fetch orientation is part of the default orientation types.
0298    * @param orientation
0299    * @throws HiveSQLException
0300    */
0301   protected void validateDefaultFetchOrientation(FetchOrientation orientation)
0302       throws HiveSQLException {
0303     validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET);
0304   }
0305 
0306   /**
0307    * Verify if the given fetch orientation is part of the supported orientation types.
0308    * @param orientation
0309    * @param supportedOrientations
0310    * @throws HiveSQLException
0311    */
0312   protected void validateFetchOrientation(FetchOrientation orientation,
0313       EnumSet<FetchOrientation> supportedOrientations) throws HiveSQLException {
0314     if (!supportedOrientations.contains(orientation)) {
0315       throw new HiveSQLException("The fetch type " + orientation.toString() +
0316           " is not supported for this resultset", "HY106");
0317     }
0318   }
0319 
0320   protected HiveSQLException toSQLException(String prefix, CommandProcessorResponse response) {
0321     HiveSQLException ex = new HiveSQLException(prefix + ": " + response.getErrorMessage(),
0322         response.getSQLState(), response.getResponseCode());
0323     if (response.getException() != null) {
0324       ex.initCause(response.getException());
0325     }
0326     return ex;
0327   }
0328 }