Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli.operation;
0020 
0021 import java.sql.SQLException;
0022 import java.util.ArrayList;
0023 import java.util.HashMap;
0024 import java.util.List;
0025 import java.util.Map;
0026 
0027 import org.apache.commons.logging.Log;
0028 import org.apache.commons.logging.LogFactory;
0029 import org.apache.hadoop.hive.conf.HiveConf;
0030 import org.apache.hadoop.hive.metastore.api.FieldSchema;
0031 import org.apache.hadoop.hive.metastore.api.Schema;
0032 import org.apache.hadoop.hive.ql.session.OperationLog;
0033 import org.apache.hive.service.AbstractService;
0034 import org.apache.hive.service.cli.FetchOrientation;
0035 import org.apache.hive.service.cli.HiveSQLException;
0036 import org.apache.hive.service.cli.OperationHandle;
0037 import org.apache.hive.service.cli.OperationState;
0038 import org.apache.hive.service.cli.OperationStatus;
0039 import org.apache.hive.service.cli.RowSet;
0040 import org.apache.hive.service.cli.RowSetFactory;
0041 import org.apache.hive.service.cli.TableSchema;
0042 import org.apache.hive.service.cli.session.HiveSession;
0043 import org.apache.log4j.Appender;
0044 import org.apache.log4j.Logger;
0045 
0046 /**
0047  * OperationManager.
0048  *
0049  */
0050 public class OperationManager extends AbstractService {
0051   private final Log LOG = LogFactory.getLog(OperationManager.class.getName());
0052 
0053   private final Map<OperationHandle, Operation> handleToOperation =
0054       new HashMap<OperationHandle, Operation>();
0055 
0056   public OperationManager() {
0057     super(OperationManager.class.getSimpleName());
0058   }
0059 
0060   @Override
0061   public synchronized void init(HiveConf hiveConf) {
0062     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
0063       initOperationLogCapture(hiveConf.getVar(
0064         HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL));
0065     } else {
0066       LOG.debug("Operation level logging is turned off");
0067     }
0068     super.init(hiveConf);
0069   }
0070 
0071   @Override
0072   public synchronized void start() {
0073     super.start();
0074     // TODO
0075   }
0076 
0077   @Override
0078   public synchronized void stop() {
0079     // TODO
0080     super.stop();
0081   }
0082 
0083   private void initOperationLogCapture(String loggingMode) {
0084     // Register another Appender (with the same layout) that talks to us.
0085     Appender ap = new LogDivertAppender(this, OperationLog.getLoggingLevel(loggingMode));
0086     Logger.getRootLogger().addAppender(ap);
0087   }
0088 
0089   public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
0090       String statement, Map<String, String> confOverlay, boolean runAsync)
0091           throws HiveSQLException {
0092     ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
0093         .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync);
0094     addOperation(executeStatementOperation);
0095     return executeStatementOperation;
0096   }
0097 
0098   public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) {
0099     GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession);
0100     addOperation(operation);
0101     return operation;
0102   }
0103 
0104   public GetCatalogsOperation newGetCatalogsOperation(HiveSession parentSession) {
0105     GetCatalogsOperation operation = new GetCatalogsOperation(parentSession);
0106     addOperation(operation);
0107     return operation;
0108   }
0109 
0110   public GetSchemasOperation newGetSchemasOperation(HiveSession parentSession,
0111       String catalogName, String schemaName) {
0112     GetSchemasOperation operation = new GetSchemasOperation(parentSession, catalogName, schemaName);
0113     addOperation(operation);
0114     return operation;
0115   }
0116 
0117   public MetadataOperation newGetTablesOperation(HiveSession parentSession,
0118       String catalogName, String schemaName, String tableName,
0119       List<String> tableTypes) {
0120     MetadataOperation operation =
0121         new GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes);
0122     addOperation(operation);
0123     return operation;
0124   }
0125 
0126   public GetTableTypesOperation newGetTableTypesOperation(HiveSession parentSession) {
0127     GetTableTypesOperation operation = new GetTableTypesOperation(parentSession);
0128     addOperation(operation);
0129     return operation;
0130   }
0131 
0132   public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession,
0133       String catalogName, String schemaName, String tableName, String columnName) {
0134     GetColumnsOperation operation = new GetColumnsOperation(parentSession,
0135         catalogName, schemaName, tableName, columnName);
0136     addOperation(operation);
0137     return operation;
0138   }
0139 
0140   public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession,
0141       String catalogName, String schemaName, String functionName) {
0142     GetFunctionsOperation operation = new GetFunctionsOperation(parentSession,
0143         catalogName, schemaName, functionName);
0144     addOperation(operation);
0145     return operation;
0146   }
0147 
0148   public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
0149     Operation operation = getOperationInternal(operationHandle);
0150     if (operation == null) {
0151       throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
0152     }
0153     return operation;
0154   }
0155 
0156   private synchronized Operation getOperationInternal(OperationHandle operationHandle) {
0157     return handleToOperation.get(operationHandle);
0158   }
0159 
0160   private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) {
0161     Operation operation = handleToOperation.get(operationHandle);
0162     if (operation != null && operation.isTimedOut(System.currentTimeMillis())) {
0163       handleToOperation.remove(operationHandle);
0164       return operation;
0165     }
0166     return null;
0167   }
0168 
0169   private synchronized void addOperation(Operation operation) {
0170     handleToOperation.put(operation.getHandle(), operation);
0171   }
0172 
0173   private synchronized Operation removeOperation(OperationHandle opHandle) {
0174     return handleToOperation.remove(opHandle);
0175   }
0176 
0177   public OperationStatus getOperationStatus(OperationHandle opHandle)
0178       throws HiveSQLException {
0179     return getOperation(opHandle).getStatus();
0180   }
0181 
0182   public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
0183     Operation operation = getOperation(opHandle);
0184     OperationState opState = operation.getStatus().getState();
0185     if (opState == OperationState.CANCELED ||
0186         opState == OperationState.CLOSED ||
0187         opState == OperationState.FINISHED ||
0188         opState == OperationState.ERROR ||
0189         opState == OperationState.UNKNOWN) {
0190       // Cancel should be a no-op in either cases
0191       LOG.debug(opHandle + ": Operation is already aborted in state - " + opState);
0192     }
0193     else {
0194       LOG.debug(opHandle + ": Attempting to cancel from state - " + opState);
0195       operation.cancel();
0196     }
0197   }
0198 
0199   public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
0200     Operation operation = removeOperation(opHandle);
0201     if (operation == null) {
0202       throw new HiveSQLException("Operation does not exist!");
0203     }
0204     operation.close();
0205   }
0206 
0207   public TableSchema getOperationResultSetSchema(OperationHandle opHandle)
0208       throws HiveSQLException {
0209     return getOperation(opHandle).getResultSetSchema();
0210   }
0211 
0212   public RowSet getOperationNextRowSet(OperationHandle opHandle)
0213       throws HiveSQLException {
0214     return getOperation(opHandle).getNextRowSet();
0215   }
0216 
0217   public RowSet getOperationNextRowSet(OperationHandle opHandle,
0218       FetchOrientation orientation, long maxRows)
0219           throws HiveSQLException {
0220     return getOperation(opHandle).getNextRowSet(orientation, maxRows);
0221   }
0222 
0223   public RowSet getOperationLogRowSet(OperationHandle opHandle,
0224       FetchOrientation orientation, long maxRows)
0225           throws HiveSQLException {
0226     // get the OperationLog object from the operation
0227     OperationLog operationLog = getOperation(opHandle).getOperationLog();
0228     if (operationLog == null) {
0229       throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle);
0230     }
0231 
0232     // read logs
0233     List<String> logs;
0234     try {
0235       logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows);
0236     } catch (SQLException e) {
0237       throw new HiveSQLException(e.getMessage(), e.getCause());
0238     }
0239 
0240 
0241     // convert logs to RowSet
0242     TableSchema tableSchema = new TableSchema(getLogSchema());
0243     RowSet rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion());
0244     for (String log : logs) {
0245       rowSet.addRow(new String[] {log});
0246     }
0247 
0248     return rowSet;
0249   }
0250 
0251   private boolean isFetchFirst(FetchOrientation fetchOrientation) {
0252     //TODO: Since OperationLog is moved to package o.a.h.h.ql.session,
0253     // we may add a Enum there and map FetchOrientation to it.
0254     if (fetchOrientation.equals(FetchOrientation.FETCH_FIRST)) {
0255       return true;
0256     }
0257     return false;
0258   }
0259 
0260   private Schema getLogSchema() {
0261     Schema schema = new Schema();
0262     FieldSchema fieldSchema = new FieldSchema();
0263     fieldSchema.setName("operation_log");
0264     fieldSchema.setType("string");
0265     schema.addToFieldSchemas(fieldSchema);
0266     return schema;
0267   }
0268 
0269   public OperationLog getOperationLogByThread() {
0270     return OperationLog.getCurrentOperationLog();
0271   }
0272 
0273   public List<Operation> removeExpiredOperations(OperationHandle[] handles) {
0274     List<Operation> removed = new ArrayList<Operation>();
0275     for (OperationHandle handle : handles) {
0276       Operation operation = removeTimedOutOperation(handle);
0277       if (operation != null) {
0278         LOG.warn("Operation " + handle + " is timed-out and will be closed");
0279         removed.add(operation);
0280       }
0281     }
0282     return removed;
0283   }
0284 }