0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.hive.service.cli.operation;
0020
0021 import java.sql.SQLException;
0022 import java.util.ArrayList;
0023 import java.util.HashMap;
0024 import java.util.List;
0025 import java.util.Map;
0026
0027 import org.apache.commons.logging.Log;
0028 import org.apache.commons.logging.LogFactory;
0029 import org.apache.hadoop.hive.conf.HiveConf;
0030 import org.apache.hadoop.hive.metastore.api.FieldSchema;
0031 import org.apache.hadoop.hive.metastore.api.Schema;
0032 import org.apache.hadoop.hive.ql.session.OperationLog;
0033 import org.apache.hive.service.AbstractService;
0034 import org.apache.hive.service.cli.FetchOrientation;
0035 import org.apache.hive.service.cli.HiveSQLException;
0036 import org.apache.hive.service.cli.OperationHandle;
0037 import org.apache.hive.service.cli.OperationState;
0038 import org.apache.hive.service.cli.OperationStatus;
0039 import org.apache.hive.service.cli.RowSet;
0040 import org.apache.hive.service.cli.RowSetFactory;
0041 import org.apache.hive.service.cli.TableSchema;
0042 import org.apache.hive.service.cli.session.HiveSession;
0043 import org.apache.log4j.Appender;
0044 import org.apache.log4j.Logger;
0045
0046
0047
0048
0049
0050 public class OperationManager extends AbstractService {
0051 private final Log LOG = LogFactory.getLog(OperationManager.class.getName());
0052
0053 private final Map<OperationHandle, Operation> handleToOperation =
0054 new HashMap<OperationHandle, Operation>();
0055
0056 public OperationManager() {
0057 super(OperationManager.class.getSimpleName());
0058 }
0059
0060 @Override
0061 public synchronized void init(HiveConf hiveConf) {
0062 if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
0063 initOperationLogCapture(hiveConf.getVar(
0064 HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL));
0065 } else {
0066 LOG.debug("Operation level logging is turned off");
0067 }
0068 super.init(hiveConf);
0069 }
0070
0071 @Override
0072 public synchronized void start() {
0073 super.start();
0074
0075 }
0076
0077 @Override
0078 public synchronized void stop() {
0079
0080 super.stop();
0081 }
0082
0083 private void initOperationLogCapture(String loggingMode) {
0084
0085 Appender ap = new LogDivertAppender(this, OperationLog.getLoggingLevel(loggingMode));
0086 Logger.getRootLogger().addAppender(ap);
0087 }
0088
0089 public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
0090 String statement, Map<String, String> confOverlay, boolean runAsync)
0091 throws HiveSQLException {
0092 ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
0093 .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync);
0094 addOperation(executeStatementOperation);
0095 return executeStatementOperation;
0096 }
0097
0098 public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) {
0099 GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession);
0100 addOperation(operation);
0101 return operation;
0102 }
0103
0104 public GetCatalogsOperation newGetCatalogsOperation(HiveSession parentSession) {
0105 GetCatalogsOperation operation = new GetCatalogsOperation(parentSession);
0106 addOperation(operation);
0107 return operation;
0108 }
0109
0110 public GetSchemasOperation newGetSchemasOperation(HiveSession parentSession,
0111 String catalogName, String schemaName) {
0112 GetSchemasOperation operation = new GetSchemasOperation(parentSession, catalogName, schemaName);
0113 addOperation(operation);
0114 return operation;
0115 }
0116
0117 public MetadataOperation newGetTablesOperation(HiveSession parentSession,
0118 String catalogName, String schemaName, String tableName,
0119 List<String> tableTypes) {
0120 MetadataOperation operation =
0121 new GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes);
0122 addOperation(operation);
0123 return operation;
0124 }
0125
0126 public GetTableTypesOperation newGetTableTypesOperation(HiveSession parentSession) {
0127 GetTableTypesOperation operation = new GetTableTypesOperation(parentSession);
0128 addOperation(operation);
0129 return operation;
0130 }
0131
0132 public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession,
0133 String catalogName, String schemaName, String tableName, String columnName) {
0134 GetColumnsOperation operation = new GetColumnsOperation(parentSession,
0135 catalogName, schemaName, tableName, columnName);
0136 addOperation(operation);
0137 return operation;
0138 }
0139
0140 public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession,
0141 String catalogName, String schemaName, String functionName) {
0142 GetFunctionsOperation operation = new GetFunctionsOperation(parentSession,
0143 catalogName, schemaName, functionName);
0144 addOperation(operation);
0145 return operation;
0146 }
0147
0148 public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
0149 Operation operation = getOperationInternal(operationHandle);
0150 if (operation == null) {
0151 throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
0152 }
0153 return operation;
0154 }
0155
0156 private synchronized Operation getOperationInternal(OperationHandle operationHandle) {
0157 return handleToOperation.get(operationHandle);
0158 }
0159
0160 private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) {
0161 Operation operation = handleToOperation.get(operationHandle);
0162 if (operation != null && operation.isTimedOut(System.currentTimeMillis())) {
0163 handleToOperation.remove(operationHandle);
0164 return operation;
0165 }
0166 return null;
0167 }
0168
0169 private synchronized void addOperation(Operation operation) {
0170 handleToOperation.put(operation.getHandle(), operation);
0171 }
0172
0173 private synchronized Operation removeOperation(OperationHandle opHandle) {
0174 return handleToOperation.remove(opHandle);
0175 }
0176
0177 public OperationStatus getOperationStatus(OperationHandle opHandle)
0178 throws HiveSQLException {
0179 return getOperation(opHandle).getStatus();
0180 }
0181
0182 public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
0183 Operation operation = getOperation(opHandle);
0184 OperationState opState = operation.getStatus().getState();
0185 if (opState == OperationState.CANCELED ||
0186 opState == OperationState.CLOSED ||
0187 opState == OperationState.FINISHED ||
0188 opState == OperationState.ERROR ||
0189 opState == OperationState.UNKNOWN) {
0190
0191 LOG.debug(opHandle + ": Operation is already aborted in state - " + opState);
0192 }
0193 else {
0194 LOG.debug(opHandle + ": Attempting to cancel from state - " + opState);
0195 operation.cancel();
0196 }
0197 }
0198
0199 public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
0200 Operation operation = removeOperation(opHandle);
0201 if (operation == null) {
0202 throw new HiveSQLException("Operation does not exist!");
0203 }
0204 operation.close();
0205 }
0206
0207 public TableSchema getOperationResultSetSchema(OperationHandle opHandle)
0208 throws HiveSQLException {
0209 return getOperation(opHandle).getResultSetSchema();
0210 }
0211
0212 public RowSet getOperationNextRowSet(OperationHandle opHandle)
0213 throws HiveSQLException {
0214 return getOperation(opHandle).getNextRowSet();
0215 }
0216
0217 public RowSet getOperationNextRowSet(OperationHandle opHandle,
0218 FetchOrientation orientation, long maxRows)
0219 throws HiveSQLException {
0220 return getOperation(opHandle).getNextRowSet(orientation, maxRows);
0221 }
0222
0223 public RowSet getOperationLogRowSet(OperationHandle opHandle,
0224 FetchOrientation orientation, long maxRows)
0225 throws HiveSQLException {
0226
0227 OperationLog operationLog = getOperation(opHandle).getOperationLog();
0228 if (operationLog == null) {
0229 throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle);
0230 }
0231
0232
0233 List<String> logs;
0234 try {
0235 logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows);
0236 } catch (SQLException e) {
0237 throw new HiveSQLException(e.getMessage(), e.getCause());
0238 }
0239
0240
0241
0242 TableSchema tableSchema = new TableSchema(getLogSchema());
0243 RowSet rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion());
0244 for (String log : logs) {
0245 rowSet.addRow(new String[] {log});
0246 }
0247
0248 return rowSet;
0249 }
0250
0251 private boolean isFetchFirst(FetchOrientation fetchOrientation) {
0252
0253
0254 if (fetchOrientation.equals(FetchOrientation.FETCH_FIRST)) {
0255 return true;
0256 }
0257 return false;
0258 }
0259
0260 private Schema getLogSchema() {
0261 Schema schema = new Schema();
0262 FieldSchema fieldSchema = new FieldSchema();
0263 fieldSchema.setName("operation_log");
0264 fieldSchema.setType("string");
0265 schema.addToFieldSchemas(fieldSchema);
0266 return schema;
0267 }
0268
0269 public OperationLog getOperationLogByThread() {
0270 return OperationLog.getCurrentOperationLog();
0271 }
0272
0273 public List<Operation> removeExpiredOperations(OperationHandle[] handles) {
0274 List<Operation> removed = new ArrayList<Operation>();
0275 for (OperationHandle handle : handles) {
0276 Operation operation = removeTimedOutOperation(handle);
0277 if (operation != null) {
0278 LOG.warn("Operation " + handle + " is timed-out and will be closed");
0279 removed.add(operation);
0280 }
0281 }
0282 return removed;
0283 }
0284 }