0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.hive.service.cli.operation;
0020
0021 import java.sql.SQLException;
0022 import java.util.ArrayList;
0023 import java.util.HashMap;
0024 import java.util.List;
0025 import java.util.Map;
0026
0027 import org.apache.hadoop.hive.conf.HiveConf;
0028 import org.apache.hadoop.hive.metastore.api.FieldSchema;
0029 import org.apache.hadoop.hive.metastore.api.Schema;
0030 import org.apache.hadoop.hive.ql.session.OperationLog;
0031 import org.apache.hive.service.AbstractService;
0032 import org.apache.hive.service.cli.FetchOrientation;
0033 import org.apache.hive.service.cli.HiveSQLException;
0034 import org.apache.hive.service.cli.OperationHandle;
0035 import org.apache.hive.service.cli.OperationState;
0036 import org.apache.hive.service.cli.OperationStatus;
0037 import org.apache.hive.service.cli.RowSet;
0038 import org.apache.hive.service.cli.RowSetFactory;
0039 import org.apache.hive.service.cli.TableSchema;
0040 import org.apache.hive.service.cli.session.HiveSession;
0041 import org.apache.log4j.Appender;
0042 import org.slf4j.Logger;
0043 import org.slf4j.LoggerFactory;
0044
0045
0046
0047
0048
0049 public class OperationManager extends AbstractService {
0050 private final Logger LOG = LoggerFactory.getLogger(OperationManager.class.getName());
0051
0052 private final Map<OperationHandle, Operation> handleToOperation =
0053 new HashMap<OperationHandle, Operation>();
0054
0055 public OperationManager() {
0056 super(OperationManager.class.getSimpleName());
0057 }
0058
0059 @Override
0060 public synchronized void init(HiveConf hiveConf) {
0061 if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
0062 initOperationLogCapture(hiveConf.getVar(
0063 HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL));
0064 } else {
0065 LOG.debug("Operation level logging is turned off");
0066 }
0067 super.init(hiveConf);
0068 }
0069
0070 @Override
0071 public synchronized void start() {
0072 super.start();
0073
0074 }
0075
0076 @Override
0077 public synchronized void stop() {
0078
0079 super.stop();
0080 }
0081
0082 private void initOperationLogCapture(String loggingMode) {
0083
0084 Appender ap = new LogDivertAppender(this, OperationLog.getLoggingLevel(loggingMode));
0085 org.apache.log4j.Logger.getRootLogger().addAppender(ap);
0086 }
0087
0088 public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
0089 String statement, Map<String, String> confOverlay, boolean runAsync)
0090 throws HiveSQLException {
0091 ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
0092 .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, 0);
0093 addOperation(executeStatementOperation);
0094 return executeStatementOperation;
0095 }
0096
0097 public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
0098 String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout)
0099 throws HiveSQLException {
0100 return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync);
0101 }
0102
0103 public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) {
0104 GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession);
0105 addOperation(operation);
0106 return operation;
0107 }
0108
0109 public GetCatalogsOperation newGetCatalogsOperation(HiveSession parentSession) {
0110 GetCatalogsOperation operation = new GetCatalogsOperation(parentSession);
0111 addOperation(operation);
0112 return operation;
0113 }
0114
0115 public GetSchemasOperation newGetSchemasOperation(HiveSession parentSession,
0116 String catalogName, String schemaName) {
0117 GetSchemasOperation operation = new GetSchemasOperation(parentSession, catalogName, schemaName);
0118 addOperation(operation);
0119 return operation;
0120 }
0121
0122 public MetadataOperation newGetTablesOperation(HiveSession parentSession,
0123 String catalogName, String schemaName, String tableName,
0124 List<String> tableTypes) {
0125 MetadataOperation operation =
0126 new GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes);
0127 addOperation(operation);
0128 return operation;
0129 }
0130
0131 public GetTableTypesOperation newGetTableTypesOperation(HiveSession parentSession) {
0132 GetTableTypesOperation operation = new GetTableTypesOperation(parentSession);
0133 addOperation(operation);
0134 return operation;
0135 }
0136
0137 public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession,
0138 String catalogName, String schemaName, String tableName, String columnName) {
0139 GetColumnsOperation operation = new GetColumnsOperation(parentSession,
0140 catalogName, schemaName, tableName, columnName);
0141 addOperation(operation);
0142 return operation;
0143 }
0144
0145 public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession,
0146 String catalogName, String schemaName, String functionName) {
0147 GetFunctionsOperation operation = new GetFunctionsOperation(parentSession,
0148 catalogName, schemaName, functionName);
0149 addOperation(operation);
0150 return operation;
0151 }
0152
0153 public GetPrimaryKeysOperation newGetPrimaryKeysOperation(HiveSession parentSession,
0154 String catalogName, String schemaName, String tableName) {
0155 GetPrimaryKeysOperation operation = new GetPrimaryKeysOperation(parentSession,
0156 catalogName, schemaName, tableName);
0157 addOperation(operation);
0158 return operation;
0159 }
0160
0161 public GetCrossReferenceOperation newGetCrossReferenceOperation(
0162 HiveSession session, String primaryCatalog, String primarySchema,
0163 String primaryTable, String foreignCatalog, String foreignSchema,
0164 String foreignTable) {
0165 GetCrossReferenceOperation operation = new GetCrossReferenceOperation(session,
0166 primaryCatalog, primarySchema, primaryTable, foreignCatalog, foreignSchema,
0167 foreignTable);
0168 addOperation(operation);
0169 return operation;
0170 }
0171
0172 public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
0173 Operation operation = getOperationInternal(operationHandle);
0174 if (operation == null) {
0175 throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
0176 }
0177 return operation;
0178 }
0179
0180 private synchronized Operation getOperationInternal(OperationHandle operationHandle) {
0181 return handleToOperation.get(operationHandle);
0182 }
0183
0184 private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) {
0185 Operation operation = handleToOperation.get(operationHandle);
0186 if (operation != null && operation.isTimedOut(System.currentTimeMillis())) {
0187 handleToOperation.remove(operationHandle);
0188 return operation;
0189 }
0190 return null;
0191 }
0192
0193 private synchronized void addOperation(Operation operation) {
0194 handleToOperation.put(operation.getHandle(), operation);
0195 }
0196
0197 private synchronized Operation removeOperation(OperationHandle opHandle) {
0198 return handleToOperation.remove(opHandle);
0199 }
0200
0201 public OperationStatus getOperationStatus(OperationHandle opHandle)
0202 throws HiveSQLException {
0203 return getOperation(opHandle).getStatus();
0204 }
0205
0206 public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
0207 Operation operation = getOperation(opHandle);
0208 OperationState opState = operation.getStatus().getState();
0209 if (opState == OperationState.CANCELED ||
0210 opState == OperationState.CLOSED ||
0211 opState == OperationState.FINISHED ||
0212 opState == OperationState.ERROR ||
0213 opState == OperationState.UNKNOWN) {
0214
0215 LOG.debug(opHandle + ": Operation is already aborted in state - " + opState);
0216 }
0217 else {
0218 LOG.debug(opHandle + ": Attempting to cancel from state - " + opState);
0219 operation.cancel();
0220 }
0221 }
0222
0223 public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
0224 Operation operation = removeOperation(opHandle);
0225 if (operation == null) {
0226 throw new HiveSQLException("Operation does not exist!");
0227 }
0228 operation.close();
0229 }
0230
0231 public TableSchema getOperationResultSetSchema(OperationHandle opHandle)
0232 throws HiveSQLException {
0233 return getOperation(opHandle).getResultSetSchema();
0234 }
0235
0236 public RowSet getOperationNextRowSet(OperationHandle opHandle)
0237 throws HiveSQLException {
0238 return getOperation(opHandle).getNextRowSet();
0239 }
0240
0241 public RowSet getOperationNextRowSet(OperationHandle opHandle,
0242 FetchOrientation orientation, long maxRows)
0243 throws HiveSQLException {
0244 return getOperation(opHandle).getNextRowSet(orientation, maxRows);
0245 }
0246
0247 public RowSet getOperationLogRowSet(OperationHandle opHandle,
0248 FetchOrientation orientation, long maxRows)
0249 throws HiveSQLException {
0250
0251 OperationLog operationLog = getOperation(opHandle).getOperationLog();
0252 if (operationLog == null) {
0253 throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle);
0254 }
0255
0256
0257 List<String> logs;
0258 try {
0259 logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows);
0260 } catch (SQLException e) {
0261 throw new HiveSQLException(e.getMessage(), e.getCause());
0262 }
0263
0264
0265
0266 TableSchema tableSchema = new TableSchema(getLogSchema());
0267 RowSet rowSet = RowSetFactory.create(tableSchema,
0268 getOperation(opHandle).getProtocolVersion(), false);
0269 for (String log : logs) {
0270 rowSet.addRow(new String[] {log});
0271 }
0272
0273 return rowSet;
0274 }
0275
0276 private boolean isFetchFirst(FetchOrientation fetchOrientation) {
0277
0278
0279 if (fetchOrientation.equals(FetchOrientation.FETCH_FIRST)) {
0280 return true;
0281 }
0282 return false;
0283 }
0284
0285 private Schema getLogSchema() {
0286 Schema schema = new Schema();
0287 FieldSchema fieldSchema = new FieldSchema();
0288 fieldSchema.setName("operation_log");
0289 fieldSchema.setType("string");
0290 schema.addToFieldSchemas(fieldSchema);
0291 return schema;
0292 }
0293
0294 public OperationLog getOperationLogByThread() {
0295 return OperationLog.getCurrentOperationLog();
0296 }
0297
0298 public List<Operation> removeExpiredOperations(OperationHandle[] handles) {
0299 List<Operation> removed = new ArrayList<Operation>();
0300 for (OperationHandle handle : handles) {
0301 Operation operation = removeTimedOutOperation(handle);
0302 if (operation != null) {
0303 LOG.warn("Operation " + handle + " is timed-out and will be closed");
0304 removed.add(operation);
0305 }
0306 }
0307 return removed;
0308 }
0309 }