Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli.operation;
0020 
0021 import java.sql.SQLException;
0022 import java.util.ArrayList;
0023 import java.util.HashMap;
0024 import java.util.List;
0025 import java.util.Map;
0026 
0027 import org.apache.hadoop.hive.conf.HiveConf;
0028 import org.apache.hadoop.hive.metastore.api.FieldSchema;
0029 import org.apache.hadoop.hive.metastore.api.Schema;
0030 import org.apache.hadoop.hive.ql.session.OperationLog;
0031 import org.apache.hive.service.AbstractService;
0032 import org.apache.hive.service.cli.FetchOrientation;
0033 import org.apache.hive.service.cli.HiveSQLException;
0034 import org.apache.hive.service.cli.OperationHandle;
0035 import org.apache.hive.service.cli.OperationState;
0036 import org.apache.hive.service.cli.OperationStatus;
0037 import org.apache.hive.service.cli.RowSet;
0038 import org.apache.hive.service.cli.RowSetFactory;
0039 import org.apache.hive.service.cli.TableSchema;
0040 import org.apache.hive.service.cli.session.HiveSession;
0041 import org.apache.log4j.Appender;
0042 import org.slf4j.Logger;
0043 import org.slf4j.LoggerFactory;
0044 
0045 /**
0046  * OperationManager.
0047  *
0048  */
0049 public class OperationManager extends AbstractService {
0050   private final Logger LOG = LoggerFactory.getLogger(OperationManager.class.getName());
0051 
0052   private final Map<OperationHandle, Operation> handleToOperation =
0053       new HashMap<OperationHandle, Operation>();
0054 
0055   public OperationManager() {
0056     super(OperationManager.class.getSimpleName());
0057   }
0058 
0059   @Override
0060   public synchronized void init(HiveConf hiveConf) {
0061     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
0062       initOperationLogCapture(hiveConf.getVar(
0063         HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL));
0064     } else {
0065       LOG.debug("Operation level logging is turned off");
0066     }
0067     super.init(hiveConf);
0068   }
0069 
0070   @Override
0071   public synchronized void start() {
0072     super.start();
0073     // TODO
0074   }
0075 
0076   @Override
0077   public synchronized void stop() {
0078     // TODO
0079     super.stop();
0080   }
0081 
0082   private void initOperationLogCapture(String loggingMode) {
0083     // Register another Appender (with the same layout) that talks to us.
0084     Appender ap = new LogDivertAppender(this, OperationLog.getLoggingLevel(loggingMode));
0085     org.apache.log4j.Logger.getRootLogger().addAppender(ap);
0086   }
0087 
0088   public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
0089       String statement, Map<String, String> confOverlay, boolean runAsync)
0090           throws HiveSQLException {
0091     ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
0092         .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, 0);
0093     addOperation(executeStatementOperation);
0094     return executeStatementOperation;
0095   }
0096 
0097   public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
0098       String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout)
0099           throws HiveSQLException {
0100     return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync);
0101   }
0102 
0103   public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) {
0104     GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession);
0105     addOperation(operation);
0106     return operation;
0107   }
0108 
0109   public GetCatalogsOperation newGetCatalogsOperation(HiveSession parentSession) {
0110     GetCatalogsOperation operation = new GetCatalogsOperation(parentSession);
0111     addOperation(operation);
0112     return operation;
0113   }
0114 
0115   public GetSchemasOperation newGetSchemasOperation(HiveSession parentSession,
0116       String catalogName, String schemaName) {
0117     GetSchemasOperation operation = new GetSchemasOperation(parentSession, catalogName, schemaName);
0118     addOperation(operation);
0119     return operation;
0120   }
0121 
0122   public MetadataOperation newGetTablesOperation(HiveSession parentSession,
0123       String catalogName, String schemaName, String tableName,
0124       List<String> tableTypes) {
0125     MetadataOperation operation =
0126         new GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes);
0127     addOperation(operation);
0128     return operation;
0129   }
0130 
0131   public GetTableTypesOperation newGetTableTypesOperation(HiveSession parentSession) {
0132     GetTableTypesOperation operation = new GetTableTypesOperation(parentSession);
0133     addOperation(operation);
0134     return operation;
0135   }
0136 
0137   public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession,
0138       String catalogName, String schemaName, String tableName, String columnName) {
0139     GetColumnsOperation operation = new GetColumnsOperation(parentSession,
0140         catalogName, schemaName, tableName, columnName);
0141     addOperation(operation);
0142     return operation;
0143   }
0144 
0145   public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession,
0146       String catalogName, String schemaName, String functionName) {
0147     GetFunctionsOperation operation = new GetFunctionsOperation(parentSession,
0148         catalogName, schemaName, functionName);
0149     addOperation(operation);
0150     return operation;
0151   }
0152 
0153   public GetPrimaryKeysOperation newGetPrimaryKeysOperation(HiveSession parentSession,
0154       String catalogName, String schemaName, String tableName) {
0155     GetPrimaryKeysOperation operation = new GetPrimaryKeysOperation(parentSession,
0156       catalogName, schemaName, tableName);
0157     addOperation(operation);
0158     return operation;
0159   }
0160 
0161   public GetCrossReferenceOperation newGetCrossReferenceOperation(
0162       HiveSession session, String primaryCatalog, String primarySchema,
0163       String primaryTable, String foreignCatalog, String foreignSchema,
0164       String foreignTable) {
0165    GetCrossReferenceOperation operation = new GetCrossReferenceOperation(session,
0166      primaryCatalog, primarySchema, primaryTable, foreignCatalog, foreignSchema,
0167      foreignTable);
0168    addOperation(operation);
0169    return operation;
0170   }
0171 
0172   public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
0173     Operation operation = getOperationInternal(operationHandle);
0174     if (operation == null) {
0175       throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
0176     }
0177     return operation;
0178   }
0179 
0180   private synchronized Operation getOperationInternal(OperationHandle operationHandle) {
0181     return handleToOperation.get(operationHandle);
0182   }
0183 
0184   private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) {
0185     Operation operation = handleToOperation.get(operationHandle);
0186     if (operation != null && operation.isTimedOut(System.currentTimeMillis())) {
0187       handleToOperation.remove(operationHandle);
0188       return operation;
0189     }
0190     return null;
0191   }
0192 
0193   private synchronized void addOperation(Operation operation) {
0194     handleToOperation.put(operation.getHandle(), operation);
0195   }
0196 
0197   private synchronized Operation removeOperation(OperationHandle opHandle) {
0198     return handleToOperation.remove(opHandle);
0199   }
0200 
0201   public OperationStatus getOperationStatus(OperationHandle opHandle)
0202       throws HiveSQLException {
0203     return getOperation(opHandle).getStatus();
0204   }
0205 
0206   public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
0207     Operation operation = getOperation(opHandle);
0208     OperationState opState = operation.getStatus().getState();
0209     if (opState == OperationState.CANCELED ||
0210         opState == OperationState.CLOSED ||
0211         opState == OperationState.FINISHED ||
0212         opState == OperationState.ERROR ||
0213         opState == OperationState.UNKNOWN) {
0214       // Cancel should be a no-op in either cases
0215       LOG.debug(opHandle + ": Operation is already aborted in state - " + opState);
0216     }
0217     else {
0218       LOG.debug(opHandle + ": Attempting to cancel from state - " + opState);
0219       operation.cancel();
0220     }
0221   }
0222 
0223   public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
0224     Operation operation = removeOperation(opHandle);
0225     if (operation == null) {
0226       throw new HiveSQLException("Operation does not exist!");
0227     }
0228     operation.close();
0229   }
0230 
0231   public TableSchema getOperationResultSetSchema(OperationHandle opHandle)
0232       throws HiveSQLException {
0233     return getOperation(opHandle).getResultSetSchema();
0234   }
0235 
0236   public RowSet getOperationNextRowSet(OperationHandle opHandle)
0237       throws HiveSQLException {
0238     return getOperation(opHandle).getNextRowSet();
0239   }
0240 
0241   public RowSet getOperationNextRowSet(OperationHandle opHandle,
0242       FetchOrientation orientation, long maxRows)
0243           throws HiveSQLException {
0244     return getOperation(opHandle).getNextRowSet(orientation, maxRows);
0245   }
0246 
0247   public RowSet getOperationLogRowSet(OperationHandle opHandle,
0248       FetchOrientation orientation, long maxRows)
0249           throws HiveSQLException {
0250     // get the OperationLog object from the operation
0251     OperationLog operationLog = getOperation(opHandle).getOperationLog();
0252     if (operationLog == null) {
0253       throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle);
0254     }
0255 
0256     // read logs
0257     List<String> logs;
0258     try {
0259       logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows);
0260     } catch (SQLException e) {
0261       throw new HiveSQLException(e.getMessage(), e.getCause());
0262     }
0263 
0264 
0265     // convert logs to RowSet
0266     TableSchema tableSchema = new TableSchema(getLogSchema());
0267     RowSet rowSet = RowSetFactory.create(tableSchema,
0268         getOperation(opHandle).getProtocolVersion(), false);
0269     for (String log : logs) {
0270       rowSet.addRow(new String[] {log});
0271     }
0272 
0273     return rowSet;
0274   }
0275 
0276   private boolean isFetchFirst(FetchOrientation fetchOrientation) {
0277     //TODO: Since OperationLog is moved to package o.a.h.h.ql.session,
0278     // we may add a Enum there and map FetchOrientation to it.
0279     if (fetchOrientation.equals(FetchOrientation.FETCH_FIRST)) {
0280       return true;
0281     }
0282     return false;
0283   }
0284 
0285   private Schema getLogSchema() {
0286     Schema schema = new Schema();
0287     FieldSchema fieldSchema = new FieldSchema();
0288     fieldSchema.setName("operation_log");
0289     fieldSchema.setType("string");
0290     schema.addToFieldSchemas(fieldSchema);
0291     return schema;
0292   }
0293 
0294   public OperationLog getOperationLogByThread() {
0295     return OperationLog.getCurrentOperationLog();
0296   }
0297 
0298   public List<Operation> removeExpiredOperations(OperationHandle[] handles) {
0299     List<Operation> removed = new ArrayList<Operation>();
0300     for (OperationHandle handle : handles) {
0301       Operation operation = removeTimedOutOperation(handle);
0302       if (operation != null) {
0303         LOG.warn("Operation " + handle + " is timed-out and will be closed");
0304         removed.add(operation);
0305       }
0306     }
0307     return removed;
0308   }
0309 }